Skip to content

Commit

Permalink
Use add_kubernetes_metadata IP & port matching in Packetbeat
Browse files Browse the repository at this point in the history
The code was there for Metricbeat already, this PR moves it to libbeat
and adapts Packetbeat to include it
  • Loading branch information
exekias committed Nov 24, 2017
1 parent 554701f commit a990016
Show file tree
Hide file tree
Showing 9 changed files with 175 additions and 164 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di

*Packetbeat*

- Configure good defaults for `add_kubernetes_metadata`. {pull}5707[5707]

*Winlogbeat*

==== Deprecated
Expand Down
79 changes: 79 additions & 0 deletions libbeat/processors/add_kubernetes_metadata/indexers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
const (
ContainerIndexerName = "container"
PodNameIndexerName = "pod_name"
IPPortIndexerName = "ip_port"
)

// Indexer take known pods and generate all the metadata we need to enrich
Expand Down Expand Up @@ -248,3 +249,81 @@ func containerID(status PodContainerStatus) string {
}
return ""
}

// IPPortIndexer indexes pods based on all their host:port combinations
type IPPortIndexer struct {
genMeta GenMeta
}

// NewIPPortIndexer creates and returns a new indexer for pod IP & ports
func NewIPPortIndexer(_ common.Config, genMeta GenMeta) (Indexer, error) {
return &IPPortIndexer{genMeta: genMeta}, nil
}

// GetMetadata returns metadata for the given pod, if it matches the index
func (h *IPPortIndexer) GetMetadata(pod *Pod) []MetadataIndex {
commonMeta := h.genMeta.GenerateMetaData(pod)
hostPorts := h.GetIndexes(pod)
var metadata []MetadataIndex

if pod.Status.PodIP == "" {
return metadata
}
for i := 0; i < len(hostPorts); i++ {
dobreak := false
containerMeta := commonMeta.Clone()
for _, container := range pod.Spec.Containers {
ports := container.Ports

for _, port := range ports {
if port.ContainerPort == int64(0) {
continue
}
if strings.Index(hostPorts[i], fmt.Sprintf("%s:%d", pod.Status.PodIP, port.ContainerPort)) != -1 {
containerMeta["container"] = common.MapStr{
"name": container.Name,
}
dobreak = true
break
}
}

if dobreak {
break
}

}

metadata = append(metadata, MetadataIndex{
Index: hostPorts[i],
Data: containerMeta,
})
}

return metadata
}

// GetIndexes returns the indexes for the given Pod
func (h *IPPortIndexer) GetIndexes(pod *Pod) []string {
var hostPorts []string

ip := pod.Status.PodIP
if ip == "" {
return hostPorts
}
for _, container := range pod.Spec.Containers {
ports := container.Ports

for _, port := range ports {
if port.ContainerPort != int64(0) {
hostPorts = append(hostPorts, fmt.Sprintf("%s:%d", ip, port.ContainerPort))
} else {
hostPorts = append(hostPorts, ip)
}

}

}

return hostPorts
}
66 changes: 66 additions & 0 deletions libbeat/processors/add_kubernetes_metadata/indexers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,3 +237,69 @@ func TestFilteredGenMetaExclusion(t *testing.T) {
ok, _ = labelMap.HasKey("x")
assert.Equal(t, ok, false)
}

func TestIpPortIndexer(t *testing.T) {
var testConfig = common.NewConfig()

ipIndexer, err := NewIPPortIndexer(*testConfig, metagen)
assert.Nil(t, err)

podName := "testpod"
ns := "testns"
container := "container"
ip := "1.2.3.4"
port := int64(80)
pod := Pod{
Metadata: ObjectMeta{
Name: podName,
Namespace: ns,
Labels: map[string]string{
"labelkey": "labelvalue",
},
},
Spec: PodSpec{
Containers: make([]Container, 0),
},

Status: PodStatus{
PodIP: ip,
},
}

indexers := ipIndexer.GetMetadata(&pod)
indices := ipIndexer.GetIndexes(&pod)
assert.Equal(t, len(indexers), 0)
assert.Equal(t, len(indices), 0)
expected := common.MapStr{
"pod": common.MapStr{
"name": "testpod",
},
"namespace": "testns",
"labels": common.MapStr{
"labelkey": "labelvalue",
},
}

pod.Spec.Containers = []Container{
{
Name: container,
Ports: []ContainerPort{
{
Name: container,
ContainerPort: port,
},
},
},
}
expected["container"] = common.MapStr{"name": container}

indexers = ipIndexer.GetMetadata(&pod)
assert.Equal(t, len(indexers), 1)
assert.Equal(t, indexers[0].Index, fmt.Sprintf("%s:%d", ip, port))

indices = ipIndexer.GetIndexes(&pod)
assert.Equal(t, len(indices), 1)
assert.Equal(t, indices[0], fmt.Sprintf("%s:%d", ip, port))

assert.Equal(t, expected.String(), indexers[0].Data.String())
}
1 change: 1 addition & 0 deletions libbeat/processors/add_kubernetes_metadata/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func init() {
// Register default indexers
Indexing.AddIndexer(PodNameIndexerName, NewPodNameIndexer)
Indexing.AddIndexer(ContainerIndexerName, NewContainerIndexer)
Indexing.AddIndexer(IPPortIndexerName, NewIPPortIndexer)
Indexing.AddMatcher(FieldMatcherName, NewFieldMatcher)
Indexing.AddMatcher(FieldFormatMatcherName, NewFieldFormatMatcher)
}
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/beater/metricbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/module"

// Add metricbeat specific processors
// Add metricbeat default processors
_ "github.com/elastic/beats/metricbeat/processor/add_kubernetes_metadata"
)

Expand Down
85 changes: 1 addition & 84 deletions metricbeat/processor/add_kubernetes_metadata/indexers.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,16 @@
package add_kubernetes_metadata

import (
"fmt"
"strings"

"github.com/elastic/beats/libbeat/common"
kubernetes "github.com/elastic/beats/libbeat/processors/add_kubernetes_metadata"
)

const (
IpPortIndexerName = "ip_port"
)

func init() {
// Register default indexers
kubernetes.Indexing.AddIndexer(IpPortIndexerName, NewIpPortIndexer)
cfg := common.NewConfig()

//Add IP Port Indexer as a default indexer
kubernetes.Indexing.AddDefaultIndexerConfig(IpPortIndexerName, *cfg)
kubernetes.Indexing.AddDefaultIndexerConfig(kubernetes.IPPortIndexerName, *cfg)

config := map[string]interface{}{
"lookup_fields": []string{"metricset.host"},
Expand All @@ -29,78 +21,3 @@ func init() {
kubernetes.Indexing.AddDefaultMatcherConfig(kubernetes.FieldMatcherName, *fieldCfg)
}
}

// IpPortIndexer indexes pods based on all their host:port combinations
type IpPortIndexer struct {
genMeta kubernetes.GenMeta
}

func NewIpPortIndexer(_ common.Config, genMeta kubernetes.GenMeta) (kubernetes.Indexer, error) {
return &IpPortIndexer{genMeta: genMeta}, nil
}

func (h *IpPortIndexer) GetMetadata(pod *kubernetes.Pod) []kubernetes.MetadataIndex {
commonMeta := h.genMeta.GenerateMetaData(pod)
hostPorts := h.GetIndexes(pod)
var metadata []kubernetes.MetadataIndex

if pod.Status.PodIP == "" {
return metadata
}
for i := 0; i < len(hostPorts); i++ {
dobreak := false
containerMeta := commonMeta.Clone()
for _, container := range pod.Spec.Containers {
ports := container.Ports

for _, port := range ports {
if port.ContainerPort == int64(0) {
continue
}
if strings.Index(hostPorts[i], fmt.Sprintf("%s:%d", pod.Status.PodIP, port.ContainerPort)) != -1 {
containerMeta["container"] = common.MapStr{
"name": container.Name,
}
dobreak = true
break
}
}

if dobreak {
break
}

}

metadata = append(metadata, kubernetes.MetadataIndex{
Index: hostPorts[i],
Data: containerMeta,
})
}

return metadata
}

func (h *IpPortIndexer) GetIndexes(pod *kubernetes.Pod) []string {
var hostPorts []string

ip := pod.Status.PodIP
if ip == "" {
return hostPorts
}
for _, container := range pod.Spec.Containers {
ports := container.Ports

for _, port := range ports {
if port.ContainerPort != int64(0) {
hostPorts = append(hostPorts, fmt.Sprintf("%s:%d", ip, port.ContainerPort))
} else {
hostPorts = append(hostPorts, ip)
}

}

}

return hostPorts
}
79 changes: 0 additions & 79 deletions metricbeat/processor/add_kubernetes_metadata/indexers_test.go

This file was deleted.

3 changes: 3 additions & 0 deletions packetbeat/beater/packetbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import (
"github.com/elastic/beats/packetbeat/protos/udp"
"github.com/elastic/beats/packetbeat/publish"
"github.com/elastic/beats/packetbeat/sniffer"

// Add packetbeat default processors
_ "github.com/elastic/beats/packetbeat/processor/add_kubernetes_metadata"
)

// Beater object. Contains all objects needed to run the beat
Expand Down
Loading

0 comments on commit a990016

Please sign in to comment.