Skip to content

Commit

Permalink
Adding support for include_labels and include_annotations in kubernet…
Browse files Browse the repository at this point in the history
…es processor (elastic#4043)
  • Loading branch information
vjsamuel authored and exekias committed Apr 19, 2017
1 parent e288cf6 commit d4222cc
Show file tree
Hide file tree
Showing 8 changed files with 206 additions and 112 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff]
- Add http endpoint. {pull}3717[3717]
- Updated to Go 1.8.1. {pull}4033[4033]
- Add kubernetes processor {pull}3888[3888]
- Add support for include_labels and include_annotations in kubernetes processor {pull}4043[4043]

*Filebeat*

Expand Down
32 changes: 4 additions & 28 deletions filebeat/processor/annotate/kubernetes/indexing.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,37 +11,13 @@ import (

func init() {
kubernetes.Indexing.AddMatcher(LogPathMatcherName, newLogsPathMatcher)
cfg := common.NewConfig()

indexer := kubernetes.Indexing.GetIndexer(kubernetes.ContainerIndexerName)
//Add a container indexer by default.
if indexer != nil {
cfg := common.NewConfig()
container, err := indexer(*cfg)

if err == nil {
kubernetes.Indexing.AddDefaultIndexer(container)
} else {
logp.Err("Unable to load indexer plugin due to error: %v", err)
}
} else {
logp.Err("Unable to get indexer plugin %s", kubernetes.ContainerIndexerName)
}
//Add a container indexer config by default.
kubernetes.Indexing.AddDefaultIndexerConfig(kubernetes.ContainerIndexerName, *cfg)

//Add a log path matcher which can extract container ID from the "source" field.
matcher := kubernetes.Indexing.GetMatcher(LogPathMatcherName)

if matcher != nil {
cfg := common.NewConfig()
logsPathMatcher, err := matcher(*cfg)
if err == nil {
kubernetes.Indexing.AddDefaultMatcher(logsPathMatcher)
} else {
logp.Err("Unable to load matcher plugin due to error: %v", err)
}
} else {
logp.Err("Unable to get matcher plugin %s", LogPathMatcherName)
}

kubernetes.Indexing.AddDefaultMatcherConfig(LogPathMatcherName, *cfg)
}

const LogPathMatcherName = "logs_path"
Expand Down
20 changes: 11 additions & 9 deletions libbeat/processors/annotate/kubernetes/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@ import (
)

type kubeAnnotatorConfig struct {
InCluster bool `config:"in_cluster"`
KubeConfig string `config:"kube_config"`
Host string `config:"host"`
Namespace string `config:"namespace"`
SyncPeriod time.Duration `config:"sync_period"`
Indexers PluginConfig `config:"indexers"`
Matchers PluginConfig `config:"matchers"`
DefaultMatchers Enabled `config:"default_matchers"`
DefaultIndexers Enabled `config:"default_indexers"`
InCluster bool `config:"in_cluster"`
KubeConfig string `config:"kube_config"`
Host string `config:"host"`
Namespace string `config:"namespace"`
SyncPeriod time.Duration `config:"sync_period"`
Indexers PluginConfig `config:"indexers"`
Matchers PluginConfig `config:"matchers"`
DefaultMatchers Enabled `config:"default_matchers"`
DefaultIndexers Enabled `config:"default_indexers"`
IncludeLabels []string `config:"include_labels"`
IncludeAnnotations []string `config:"include_annotations"`
}

type Enabled struct {
Expand Down
95 changes: 72 additions & 23 deletions libbeat/processors/annotate/kubernetes/indexing.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ type Matcher interface {
MetadataIndex(event common.MapStr) string
}

//GenMeta takes in pods to generate metadata for them
type GenMeta interface {
//GenerateMetaData generates metadata by taking in a pod as an input
GenerateMetaData(pod *corev1.Pod) common.MapStr
}

type Indexers struct {
sync.RWMutex
indexers []Indexer
Expand All @@ -64,11 +70,11 @@ type Register struct {
indexers map[string]IndexConstructor
matchers map[string]MatcherConstructor

defaultIndexers []Indexer
defaultMatchers []Matcher
defaultIndexerConfigs map[string]common.Config
defaultMatcherConfigs map[string]common.Config
}

type IndexConstructor func(config common.Config) (Indexer, error)
type IndexConstructor func(config common.Config, genMeta GenMeta) (Indexer, error)
type MatcherConstructor func(config common.Config) (Matcher, error)

// NewRegister creates and returns a new Register.
Expand All @@ -77,8 +83,8 @@ func NewRegister() *Register {
indexers: make(map[string]IndexConstructor, 0),
matchers: make(map[string]MatcherConstructor, 0),

defaultIndexers: make([]Indexer, 0),
defaultMatchers: make([]Matcher, 0),
defaultIndexerConfigs: make(map[string]common.Config, 0),
defaultMatcherConfigs: make(map[string]common.Config, 0),
}
}

Expand All @@ -97,13 +103,13 @@ func (r *Register) AddMatcher(name string, matcher MatcherConstructor) {
}

// AddIndexer to the register
func (r *Register) AddDefaultIndexer(indexer Indexer) {
r.defaultIndexers = append(r.defaultIndexers, indexer)
func (r *Register) AddDefaultIndexerConfig(name string, config common.Config) {
r.defaultIndexerConfigs[name] = config
}

// AddMatcher to the register
func (r *Register) AddDefaultMatcher(matcher Matcher) {
r.defaultMatchers = append(r.defaultMatchers, matcher)
func (r *Register) AddDefaultMatcherConfig(name string, config common.Config) {
r.defaultMatcherConfigs[name] = config
}

// AddIndexer to the register
Expand Down Expand Up @@ -167,28 +173,69 @@ func (m *Matchers) MetadataIndex(event common.MapStr) string {
return ""
}

// GenMetadata generates default metadata for the given pod
func GenMetadata(pod *corev1.Pod) common.MapStr {
type GenDefaultMeta struct {
annotations []string
labels []string
}

// GenerateMetaData generates default metadata for the given pod taking to account certain filters
func (g *GenDefaultMeta) GenerateMetaData(pod *corev1.Pod) common.MapStr {
labelMap := common.MapStr{}
for k, v := range pod.Metadata.Labels {
labelMap[k] = v
annotationsMap := common.MapStr{}

if len(g.labels) == 0 {
for k, v := range pod.Metadata.Labels {
labelMap[k] = v
}
} else {
labelMap = generateMapSubset(pod.Metadata.Labels, g.labels)
}
return common.MapStr{

annotationsMap = generateMapSubset(pod.Metadata.Annotations, g.annotations)

meta := common.MapStr{
"pod": pod.Metadata.GetName(),
"namespace": pod.Metadata.GetNamespace(),
"labels": labelMap,
}

if len(labelMap) != 0 {
meta["labels"] = labelMap
}

if len(annotationsMap) != 0 {
meta["annotations"] = annotationsMap
}

return meta
}

func generateMapSubset(input map[string]string, keys []string) common.MapStr {
output := common.MapStr{}
if input == nil {
return output
}

for _, key := range keys {
value, ok := input[key]
if ok {
output[key] = value
}
}

return output
}

// PodNameIndexer implements default indexer based on pod name
type PodNameIndexer struct{}
type PodNameIndexer struct {
genMeta GenMeta
}

func NewPodNameIndexer(_ common.Config) (Indexer, error) {
return &PodNameIndexer{}, nil
func NewPodNameIndexer(_ common.Config, genMeta GenMeta) (Indexer, error) {
return &PodNameIndexer{genMeta: genMeta}, nil
}

func (p *PodNameIndexer) GetMetadata(pod *corev1.Pod) []MetadataIndex {
data := GenMetadata(pod)
data := p.genMeta.GenerateMetaData(pod)
return []MetadataIndex{
{
Index: pod.Metadata.GetName(),
Expand All @@ -202,14 +249,16 @@ func (p *PodNameIndexer) GetIndexes(pod *corev1.Pod) []string {
}

// ContainerIndexer indexes pods based on all their containers IDs
type ContainerIndexer struct{}
type ContainerIndexer struct {
genMeta GenMeta
}

func NewContainerIndexer(_ common.Config) (Indexer, error) {
return &ContainerIndexer{}, nil
func NewContainerIndexer(_ common.Config, genMeta GenMeta) (Indexer, error) {
return &ContainerIndexer{genMeta: genMeta}, nil
}

func (c *ContainerIndexer) GetMetadata(pod *corev1.Pod) []MetadataIndex {
commonMeta := GenMetadata(pod)
commonMeta := c.genMeta.GenerateMetaData(pod)
containers := c.GetIndexes(pod)
var metadata []MetadataIndex
for i := 0; i < len(containers); i++ {
Expand Down
74 changes: 72 additions & 2 deletions libbeat/processors/annotate/kubernetes/indexing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (
"testing"
)

var metagen = &GenDefaultMeta{}

func TestPodIndexer(t *testing.T) {
var testConfig = common.NewConfig()

podIndexer, err := NewPodNameIndexer(*testConfig)
podIndexer, err := NewPodNameIndexer(*testConfig, metagen)
assert.Nil(t, err)

podName := "testpod"
Expand Down Expand Up @@ -49,7 +51,7 @@ func TestPodIndexer(t *testing.T) {
func TestContainerIndexer(t *testing.T) {
var testConfig = common.NewConfig()

conIndexer, err := NewContainerIndexer(*testConfig)
conIndexer, err := NewContainerIndexer(*testConfig, metagen)
assert.Nil(t, err)

podName := "testpod"
Expand Down Expand Up @@ -133,3 +135,71 @@ func TestFieldMatcher(t *testing.T) {
out = matcher.MetadataIndex(nonMatchInput)
assert.Equal(t, out, "")
}

func TestFilteredGenMeta(t *testing.T) {
var testConfig = common.NewConfig()

filteredGen := &GenDefaultMeta{}
podIndexer, err := NewPodNameIndexer(*testConfig, filteredGen)
assert.Nil(t, err)

podName := "testpod"
ns := "testns"
pod := corev1.Pod{
Metadata: &metav1.ObjectMeta{
Name: &podName,
Namespace: &ns,
Labels: map[string]string{
"foo": "bar",
"x": "y",
},
Annotations: map[string]string{
"a": "b",
"c": "d",
},
},
Spec: &corev1.PodSpec{},
}

indexers := podIndexer.GetMetadata(&pod)
assert.Equal(t, len(indexers), 1)

rawLabels, _ := indexers[0].Data["labels"]
assert.NotNil(t, rawLabels)

labelMap, ok := rawLabels.(common.MapStr)
assert.Equal(t, ok, true)
assert.Equal(t, len(labelMap), 2)

rawAnnotations := indexers[0].Data["annotations"]
assert.Nil(t, rawAnnotations)

filteredGen.labels = []string{"foo"}
filteredGen.annotations = []string{"a"}

podIndexer, err = NewPodNameIndexer(*testConfig, filteredGen)
assert.Nil(t, err)

indexers = podIndexer.GetMetadata(&pod)
assert.Equal(t, len(indexers), 1)

rawLabels, _ = indexers[0].Data["labels"]
assert.NotNil(t, rawLabels)

labelMap, ok = rawLabels.(common.MapStr)
assert.Equal(t, ok, true)
assert.Equal(t, len(labelMap), 1)

ok, _ = labelMap.HasKey("foo")
assert.Equal(t, ok, true)

rawAnnotations = indexers[0].Data["annotations"]
assert.NotNil(t, rawAnnotations)
annotationsMap, ok := rawAnnotations.(common.MapStr)

assert.Equal(t, ok, true)
assert.Equal(t, len(annotationsMap), 1)

ok, _ = annotationsMap.HasKey("a")
assert.Equal(t, ok, true)
}
Loading

0 comments on commit d4222cc

Please sign in to comment.