diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 8b148b6f5cb..48fb33db0a3 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -99,6 +99,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff] - Add http endpoint. {pull}3717[3717] - Updated to Go 1.8.1. {pull}4033[4033] - Add kubernetes processor {pull}3888[3888] +- Add support for include_labels and include_annotations in kubernetes processor {pull}4043[4043] *Filebeat* diff --git a/filebeat/processor/annotate/kubernetes/indexing.go b/filebeat/processor/annotate/kubernetes/indexing.go index 3ad61d8ce0a..981c5643564 100644 --- a/filebeat/processor/annotate/kubernetes/indexing.go +++ b/filebeat/processor/annotate/kubernetes/indexing.go @@ -11,37 +11,13 @@ import ( func init() { kubernetes.Indexing.AddMatcher(LogPathMatcherName, newLogsPathMatcher) + cfg := common.NewConfig() - indexer := kubernetes.Indexing.GetIndexer(kubernetes.ContainerIndexerName) - //Add a container indexer by default. - if indexer != nil { - cfg := common.NewConfig() - container, err := indexer(*cfg) - - if err == nil { - kubernetes.Indexing.AddDefaultIndexer(container) - } else { - logp.Err("Unable to load indexer plugin due to error: %v", err) - } - } else { - logp.Err("Unable to get indexer plugin %s", kubernetes.ContainerIndexerName) - } + //Add a container indexer config by default. + kubernetes.Indexing.AddDefaultIndexerConfig(kubernetes.ContainerIndexerName, *cfg) //Add a log path matcher which can extract container ID from the "source" field. - matcher := kubernetes.Indexing.GetMatcher(LogPathMatcherName) - - if matcher != nil { - cfg := common.NewConfig() - logsPathMatcher, err := matcher(*cfg) - if err == nil { - kubernetes.Indexing.AddDefaultMatcher(logsPathMatcher) - } else { - logp.Err("Unable to load matcher plugin due to error: %v", err) - } - } else { - logp.Err("Unable to get matcher plugin %s", LogPathMatcherName) - } - + kubernetes.Indexing.AddDefaultMatcherConfig(LogPathMatcherName, *cfg) } const LogPathMatcherName = "logs_path" diff --git a/libbeat/processors/annotate/kubernetes/config.go b/libbeat/processors/annotate/kubernetes/config.go index 874ef9d9c95..99f5a17ed2c 100644 --- a/libbeat/processors/annotate/kubernetes/config.go +++ b/libbeat/processors/annotate/kubernetes/config.go @@ -6,15 +6,17 @@ import ( ) type kubeAnnotatorConfig struct { - InCluster bool `config:"in_cluster"` - KubeConfig string `config:"kube_config"` - Host string `config:"host"` - Namespace string `config:"namespace"` - SyncPeriod time.Duration `config:"sync_period"` - Indexers PluginConfig `config:"indexers"` - Matchers PluginConfig `config:"matchers"` - DefaultMatchers Enabled `config:"default_matchers"` - DefaultIndexers Enabled `config:"default_indexers"` + InCluster bool `config:"in_cluster"` + KubeConfig string `config:"kube_config"` + Host string `config:"host"` + Namespace string `config:"namespace"` + SyncPeriod time.Duration `config:"sync_period"` + Indexers PluginConfig `config:"indexers"` + Matchers PluginConfig `config:"matchers"` + DefaultMatchers Enabled `config:"default_matchers"` + DefaultIndexers Enabled `config:"default_indexers"` + IncludeLabels []string `config:"include_labels"` + IncludeAnnotations []string `config:"include_annotations"` } type Enabled struct { diff --git a/libbeat/processors/annotate/kubernetes/indexing.go b/libbeat/processors/annotate/kubernetes/indexing.go index 79a67d20b03..63b05b781eb 100644 --- a/libbeat/processors/annotate/kubernetes/indexing.go +++ b/libbeat/processors/annotate/kubernetes/indexing.go @@ -48,6 +48,12 @@ type Matcher interface { MetadataIndex(event common.MapStr) string } +//GenMeta takes in pods to generate metadata for them +type GenMeta interface { + //GenerateMetaData generates metadata by taking in a pod as an input + GenerateMetaData(pod *corev1.Pod) common.MapStr +} + type Indexers struct { sync.RWMutex indexers []Indexer @@ -64,11 +70,11 @@ type Register struct { indexers map[string]IndexConstructor matchers map[string]MatcherConstructor - defaultIndexers []Indexer - defaultMatchers []Matcher + defaultIndexerConfigs map[string]common.Config + defaultMatcherConfigs map[string]common.Config } -type IndexConstructor func(config common.Config) (Indexer, error) +type IndexConstructor func(config common.Config, genMeta GenMeta) (Indexer, error) type MatcherConstructor func(config common.Config) (Matcher, error) // NewRegister creates and returns a new Register. @@ -77,8 +83,8 @@ func NewRegister() *Register { indexers: make(map[string]IndexConstructor, 0), matchers: make(map[string]MatcherConstructor, 0), - defaultIndexers: make([]Indexer, 0), - defaultMatchers: make([]Matcher, 0), + defaultIndexerConfigs: make(map[string]common.Config, 0), + defaultMatcherConfigs: make(map[string]common.Config, 0), } } @@ -97,13 +103,13 @@ func (r *Register) AddMatcher(name string, matcher MatcherConstructor) { } // AddIndexer to the register -func (r *Register) AddDefaultIndexer(indexer Indexer) { - r.defaultIndexers = append(r.defaultIndexers, indexer) +func (r *Register) AddDefaultIndexerConfig(name string, config common.Config) { + r.defaultIndexerConfigs[name] = config } // AddMatcher to the register -func (r *Register) AddDefaultMatcher(matcher Matcher) { - r.defaultMatchers = append(r.defaultMatchers, matcher) +func (r *Register) AddDefaultMatcherConfig(name string, config common.Config) { + r.defaultMatcherConfigs[name] = config } // AddIndexer to the register @@ -167,28 +173,69 @@ func (m *Matchers) MetadataIndex(event common.MapStr) string { return "" } -// GenMetadata generates default metadata for the given pod -func GenMetadata(pod *corev1.Pod) common.MapStr { +type GenDefaultMeta struct { + annotations []string + labels []string +} + +// GenerateMetaData generates default metadata for the given pod taking to account certain filters +func (g *GenDefaultMeta) GenerateMetaData(pod *corev1.Pod) common.MapStr { labelMap := common.MapStr{} - for k, v := range pod.Metadata.Labels { - labelMap[k] = v + annotationsMap := common.MapStr{} + + if len(g.labels) == 0 { + for k, v := range pod.Metadata.Labels { + labelMap[k] = v + } + } else { + labelMap = generateMapSubset(pod.Metadata.Labels, g.labels) } - return common.MapStr{ + + annotationsMap = generateMapSubset(pod.Metadata.Annotations, g.annotations) + + meta := common.MapStr{ "pod": pod.Metadata.GetName(), "namespace": pod.Metadata.GetNamespace(), - "labels": labelMap, } + + if len(labelMap) != 0 { + meta["labels"] = labelMap + } + + if len(annotationsMap) != 0 { + meta["annotations"] = annotationsMap + } + + return meta +} + +func generateMapSubset(input map[string]string, keys []string) common.MapStr { + output := common.MapStr{} + if input == nil { + return output + } + + for _, key := range keys { + value, ok := input[key] + if ok { + output[key] = value + } + } + + return output } // PodNameIndexer implements default indexer based on pod name -type PodNameIndexer struct{} +type PodNameIndexer struct { + genMeta GenMeta +} -func NewPodNameIndexer(_ common.Config) (Indexer, error) { - return &PodNameIndexer{}, nil +func NewPodNameIndexer(_ common.Config, genMeta GenMeta) (Indexer, error) { + return &PodNameIndexer{genMeta: genMeta}, nil } func (p *PodNameIndexer) GetMetadata(pod *corev1.Pod) []MetadataIndex { - data := GenMetadata(pod) + data := p.genMeta.GenerateMetaData(pod) return []MetadataIndex{ { Index: pod.Metadata.GetName(), @@ -202,14 +249,16 @@ func (p *PodNameIndexer) GetIndexes(pod *corev1.Pod) []string { } // ContainerIndexer indexes pods based on all their containers IDs -type ContainerIndexer struct{} +type ContainerIndexer struct { + genMeta GenMeta +} -func NewContainerIndexer(_ common.Config) (Indexer, error) { - return &ContainerIndexer{}, nil +func NewContainerIndexer(_ common.Config, genMeta GenMeta) (Indexer, error) { + return &ContainerIndexer{genMeta: genMeta}, nil } func (c *ContainerIndexer) GetMetadata(pod *corev1.Pod) []MetadataIndex { - commonMeta := GenMetadata(pod) + commonMeta := c.genMeta.GenerateMetaData(pod) containers := c.GetIndexes(pod) var metadata []MetadataIndex for i := 0; i < len(containers); i++ { diff --git a/libbeat/processors/annotate/kubernetes/indexing_test.go b/libbeat/processors/annotate/kubernetes/indexing_test.go index 31be39bc244..da9830cfac1 100644 --- a/libbeat/processors/annotate/kubernetes/indexing_test.go +++ b/libbeat/processors/annotate/kubernetes/indexing_test.go @@ -8,10 +8,12 @@ import ( "testing" ) +var metagen = &GenDefaultMeta{} + func TestPodIndexer(t *testing.T) { var testConfig = common.NewConfig() - podIndexer, err := NewPodNameIndexer(*testConfig) + podIndexer, err := NewPodNameIndexer(*testConfig, metagen) assert.Nil(t, err) podName := "testpod" @@ -49,7 +51,7 @@ func TestPodIndexer(t *testing.T) { func TestContainerIndexer(t *testing.T) { var testConfig = common.NewConfig() - conIndexer, err := NewContainerIndexer(*testConfig) + conIndexer, err := NewContainerIndexer(*testConfig, metagen) assert.Nil(t, err) podName := "testpod" @@ -133,3 +135,71 @@ func TestFieldMatcher(t *testing.T) { out = matcher.MetadataIndex(nonMatchInput) assert.Equal(t, out, "") } + +func TestFilteredGenMeta(t *testing.T) { + var testConfig = common.NewConfig() + + filteredGen := &GenDefaultMeta{} + podIndexer, err := NewPodNameIndexer(*testConfig, filteredGen) + assert.Nil(t, err) + + podName := "testpod" + ns := "testns" + pod := corev1.Pod{ + Metadata: &metav1.ObjectMeta{ + Name: &podName, + Namespace: &ns, + Labels: map[string]string{ + "foo": "bar", + "x": "y", + }, + Annotations: map[string]string{ + "a": "b", + "c": "d", + }, + }, + Spec: &corev1.PodSpec{}, + } + + indexers := podIndexer.GetMetadata(&pod) + assert.Equal(t, len(indexers), 1) + + rawLabels, _ := indexers[0].Data["labels"] + assert.NotNil(t, rawLabels) + + labelMap, ok := rawLabels.(common.MapStr) + assert.Equal(t, ok, true) + assert.Equal(t, len(labelMap), 2) + + rawAnnotations := indexers[0].Data["annotations"] + assert.Nil(t, rawAnnotations) + + filteredGen.labels = []string{"foo"} + filteredGen.annotations = []string{"a"} + + podIndexer, err = NewPodNameIndexer(*testConfig, filteredGen) + assert.Nil(t, err) + + indexers = podIndexer.GetMetadata(&pod) + assert.Equal(t, len(indexers), 1) + + rawLabels, _ = indexers[0].Data["labels"] + assert.NotNil(t, rawLabels) + + labelMap, ok = rawLabels.(common.MapStr) + assert.Equal(t, ok, true) + assert.Equal(t, len(labelMap), 1) + + ok, _ = labelMap.HasKey("foo") + assert.Equal(t, ok, true) + + rawAnnotations = indexers[0].Data["annotations"] + assert.NotNil(t, rawAnnotations) + annotationsMap, ok := rawAnnotations.(common.MapStr) + + assert.Equal(t, ok, true) + assert.Equal(t, len(annotationsMap), 1) + + ok, _ = annotationsMap.HasKey("a") + assert.Equal(t, ok, true) +} diff --git a/libbeat/processors/annotate/kubernetes/kubernetes.go b/libbeat/processors/annotate/kubernetes/kubernetes.go index 5559d127a8c..cdeafe12946 100644 --- a/libbeat/processors/annotate/kubernetes/kubernetes.go +++ b/libbeat/processors/annotate/kubernetes/kubernetes.go @@ -52,18 +52,43 @@ func newKubernetesAnnotator(cfg common.Config) (processors.Processor, error) { return nil, err } + //Load default indexer configs + if config.DefaultIndexers.Enabled == true { + Indexing.RLock() + for key, cfg := range Indexing.defaultIndexerConfigs { + config.Indexers = append(config.Indexers, map[string]common.Config{key: cfg}) + } + Indexing.RUnlock() + } + + //Load default matcher configs + if config.DefaultMatchers.Enabled == true { + Indexing.RLock() + for key, cfg := range Indexing.defaultMatcherConfigs { + config.Matchers = append(config.Matchers, map[string]common.Config{key: cfg}) + } + Indexing.RUnlock() + } + + metaGen := &GenDefaultMeta{ + labels: config.IncludeLabels, + annotations: config.IncludeAnnotations, + } + indexers := Indexers{ indexers: []Indexer{}, } + //Create all configured indexers for _, pluginConfigs := range config.Indexers { for name, pluginConfig := range pluginConfigs { indexFunc := Indexing.GetIndexer(name) if indexFunc == nil { logp.Warn("Unable to find indexing plugin %s", name) + continue } - indexer, err := indexFunc(pluginConfig) + indexer, err := indexFunc(pluginConfig, metaGen) if err != nil { logp.Warn("Unable to initialize indexing plugin %s due to error %v", name, err) } @@ -77,6 +102,7 @@ func newKubernetesAnnotator(cfg common.Config) (processors.Processor, error) { matchers: []Matcher{}, } + //Create all configured matchers for _, pluginConfigs := range config.Matchers { for name, pluginConfig := range pluginConfigs { matchFunc := Indexing.GetMatcher(name) @@ -94,18 +120,6 @@ func newKubernetesAnnotator(cfg common.Config) (processors.Processor, error) { } } - if config.DefaultIndexers.Enabled == true { - for _, indexer := range Indexing.defaultIndexers { - indexers.indexers = append(indexers.indexers, indexer) - } - } - - if config.DefaultMatchers.Enabled == true { - for _, matcher := range Indexing.defaultMatchers { - matchers.matchers = append(matchers.matchers, matcher) - } - } - if len(matchers.matchers) == 0 { return nil, fmt.Errorf("Can not initialize kubernetes plugin with zero matcher plugins") } diff --git a/metricbeat/processor/annotate/kubernetes/indexing.go b/metricbeat/processor/annotate/kubernetes/indexing.go index 3691c9f1c8f..a1e9087444c 100644 --- a/metricbeat/processor/annotate/kubernetes/indexing.go +++ b/metricbeat/processor/annotate/kubernetes/indexing.go @@ -5,7 +5,6 @@ import ( "strings" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/processors/annotate/kubernetes" corev1 "github.com/ericchiang/k8s/api/v1" ) @@ -15,54 +14,34 @@ const ( ) func init() { - // Register default indexers kubernetes.Indexing.AddIndexer(IpPortIndexerName, newIpPortIndexer) + cfg := common.NewConfig() - indexer := kubernetes.Indexing.GetIndexer(IpPortIndexerName) + //Add IP Port Indexer as a default indexer + kubernetes.Indexing.AddDefaultIndexerConfig(IpPortIndexerName, *cfg) - if indexer != nil { - cfg := common.NewConfig() - ipPort, err := newIpPortIndexer(*cfg) - if err == nil { - kubernetes.Indexing.AddDefaultIndexer(ipPort) - } else { - logp.Err("Unable to load indexer plugin due to error: %v", err) - } - } else { - logp.Err("Unable to get indexer plugin %s", IpPortIndexerName) + config := map[string]interface{}{ + "lookup_fields": []string{"metricset.host"}, } - - matcher := kubernetes.Indexing.GetMatcher(kubernetes.FieldMatcherName) - - if matcher != nil { - config := map[string]interface{}{ - "lookup_fields": []string{"metricset.host"}, - } - fieldCfg, err := common.NewConfigFrom(config) - if err == nil { - matcher, err := kubernetes.NewFieldMatcher(*fieldCfg) - if err == nil { - kubernetes.Indexing.AddDefaultMatcher(matcher) - } - } else { - logp.Err("Unable to load matcher plugin due to error: %v", err) - } - } else { - logp.Err("Unable to get matcher plugin %s", kubernetes.FieldMatcherName) + fieldCfg, err := common.NewConfigFrom(config) + if err == nil { + //Add field matcher with field to lookup as metricset.host + kubernetes.Indexing.AddDefaultMatcherConfig(kubernetes.FieldMatcherName, *fieldCfg) } - } // IpPortIndexer indexes pods based on all their host:port combinations -type IpPortIndexer struct{} +type IpPortIndexer struct { + genMeta kubernetes.GenMeta +} -func newIpPortIndexer(_ common.Config) (kubernetes.Indexer, error) { - return &IpPortIndexer{}, nil +func newIpPortIndexer(_ common.Config, genMeta kubernetes.GenMeta) (kubernetes.Indexer, error) { + return &IpPortIndexer{genMeta: genMeta}, nil } func (h *IpPortIndexer) GetMetadata(pod *corev1.Pod) []kubernetes.MetadataIndex { - commonMeta := kubernetes.GenMetadata(pod) + commonMeta := h.genMeta.GenerateMetaData(pod) hostPorts := h.GetIndexes(pod) var metadata []kubernetes.MetadataIndex diff --git a/metricbeat/processor/annotate/kubernetes/indexing_test.go b/metricbeat/processor/annotate/kubernetes/indexing_test.go index 1724a89aa96..7777bbd93c2 100644 --- a/metricbeat/processor/annotate/kubernetes/indexing_test.go +++ b/metricbeat/processor/annotate/kubernetes/indexing_test.go @@ -3,16 +3,19 @@ package kubernetes import ( "fmt" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/processors/annotate/kubernetes" corev1 "github.com/ericchiang/k8s/api/v1" metav1 "github.com/ericchiang/k8s/apis/meta/v1" "github.com/stretchr/testify/assert" "testing" ) +var metagen = &kubernetes.GenDefaultMeta{} + func TestIpPortIndexer(t *testing.T) { var testConfig = common.NewConfig() - ipIndexer, err := newIpPortIndexer(*testConfig) + ipIndexer, err := newIpPortIndexer(*testConfig, metagen) assert.Nil(t, err) podName := "testpod"