Skip to content

Commit

Permalink
Add container ECS fields in kubernetes metadata (elastic#20984)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrsMark authored Sep 9, 2020
1 parent 3ecf7e6 commit bcb4e0c
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 37 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add leader election for Kubernetes autodiscover. {pull}20281[20281]
- Add capability of enriching process metadata with contianer id also for non-privileged containers in `add_process_metadata` processor. {pull}19767[19767]
- Add replace_fields config option in add_host_metadata for replacing host fields. {pull}20490[20490] {issue}20464[20464]
- Add container ECS fields in kubernetes metadata. {pull}20984[20984]

*Auditbeat*

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ require (
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae
golang.org/x/text v0.3.2
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
golang.org/x/tools v0.0.0-20200806022845-90696ccdc692
golang.org/x/tools v0.0.0-20200904185747-39188db58858
google.golang.org/api v0.15.0
google.golang.org/genproto v0.0.0-20191230161307-f3c370f40bfb
google.golang.org/grpc v1.29.1
Expand Down
24 changes: 18 additions & 6 deletions libbeat/autodiscover/providers/kubernetes/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,19 +335,29 @@ func (p *pod) emitEvents(pod *kubernetes.Pod, flag string, containers []kubernet
// so it works also on `stop` if containers have been already deleted.
eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.Name)

meta := p.metagen.Generate(
pod,
metadata.WithFields("container.name", c.Name),
metadata.WithFields("container.image", c.Image),
)

cmeta := common.MapStr{
"id": cid,
"name": c.Name,
"image": c.Image,
"id": cid,
"image": common.MapStr{
"name": c.Image,
},
"runtime": runtimes[c.Name],
}
meta := p.metagen.Generate(pod, metadata.WithFields("container.name", c.Name),
metadata.WithFields("container.image", c.Image))

// Information that can be used in discovering a workload
kubemeta := meta.Clone()
kubemeta["container"] = cmeta
kubemeta["annotations"] = annotations
kubemeta["container"] = common.MapStr{
"id": cid,
"name": c.Name,
"image": c.Image,
"runtime": runtimes[c.Name],
}
if len(nsAnn) != 0 {
kubemeta["namespace_annotations"] = nsAnn
}
Expand All @@ -364,6 +374,7 @@ func (p *pod) emitEvents(pod *kubernetes.Pod, flag string, containers []kubernet
"kubernetes": kubemeta,
"meta": common.MapStr{
"kubernetes": meta,
"container": cmeta,
},
}
events = append(events, event)
Expand All @@ -380,6 +391,7 @@ func (p *pod) emitEvents(pod *kubernetes.Pod, flag string, containers []kubernet
"kubernetes": kubemeta,
"meta": common.MapStr{
"kubernetes": meta,
"container": cmeta,
},
}
events = append(events, event)
Expand Down
72 changes: 54 additions & 18 deletions libbeat/autodiscover/providers/kubernetes/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,15 +446,21 @@ func TestEmitEvent(t *testing.T) {
"meta": common.MapStr{
"kubernetes": common.MapStr{
"namespace": "default",
"container": common.MapStr{
"name": "filebeat",
"image": "elastic/filebeat:6.3.0",
}, "pod": common.MapStr{
"pod": common.MapStr{
"name": "filebeat",
"uid": "005f3b90-4b9d-12f8-acf0-31020a840133",
}, "node": common.MapStr{
"name": "node",
},
"container": common.MapStr{
"name": "filebeat",
"image": "elastic/filebeat:6.3.0",
},
},
"container": common.MapStr{
"image": common.MapStr{"name": "elastic/filebeat:6.3.0"},
"id": "foobar",
"runtime": "docker",
},
},
"config": []*common.Config{},
Expand Down Expand Up @@ -565,15 +571,21 @@ func TestEmitEvent(t *testing.T) {
"meta": common.MapStr{
"kubernetes": common.MapStr{
"namespace": "default",
"container": common.MapStr{
"name": "filebeat",
"image": "elastic/filebeat:6.3.0",
}, "pod": common.MapStr{
"pod": common.MapStr{
"name": "filebeat",
"uid": "005f3b90-4b9d-12f8-acf0-31020a840133",
}, "node": common.MapStr{
"name": "node",
},
"container": common.MapStr{
"name": "filebeat",
"image": "elastic/filebeat:6.3.0",
},
},
"container": common.MapStr{
"image": common.MapStr{"name": "elastic/filebeat:6.3.0"},
"runtime": "docker",
"id": "foobar",
},
},
"config": []*common.Config{},
Expand Down Expand Up @@ -604,15 +616,21 @@ func TestEmitEvent(t *testing.T) {
"meta": common.MapStr{
"kubernetes": common.MapStr{
"namespace": "default",
"container": common.MapStr{
"name": "filebeat",
"image": "elastic/filebeat:6.3.0",
}, "pod": common.MapStr{
"pod": common.MapStr{
"name": "filebeat",
"uid": "005f3b90-4b9d-12f8-acf0-31020a840133",
}, "node": common.MapStr{
"name": "node",
},
"container": common.MapStr{
"name": "filebeat",
"image": "elastic/filebeat:6.3.0",
},
},
"container": common.MapStr{
"image": common.MapStr{"name": "elastic/filebeat:6.3.0"},
"id": "foobar",
"runtime": "docker",
},
},
"config": []*common.Config{},
Expand Down Expand Up @@ -769,15 +787,21 @@ func TestEmitEvent(t *testing.T) {
"meta": common.MapStr{
"kubernetes": common.MapStr{
"namespace": "default",
"container": common.MapStr{
"name": "filebeat",
"image": "elastic/filebeat:6.3.0",
}, "pod": common.MapStr{
"pod": common.MapStr{
"name": "filebeat",
"uid": "005f3b90-4b9d-12f8-acf0-31020a840133",
}, "node": common.MapStr{
"name": "node",
},
"container": common.MapStr{
"name": "filebeat",
"image": "elastic/filebeat:6.3.0",
},
},
"container": common.MapStr{
"image": common.MapStr{"name": "elastic/filebeat:6.3.0"},
"runtime": "",
"id": "",
},
},
"config": []*common.Config{},
Expand Down Expand Up @@ -874,13 +898,19 @@ func TestEmitEvent(t *testing.T) {
"container": common.MapStr{
"name": "filebeat",
"image": "elastic/filebeat:6.3.0",
}, "pod": common.MapStr{
},
"pod": common.MapStr{
"name": "filebeat",
"uid": "005f3b90-4b9d-12f8-acf0-31020a840133",
}, "node": common.MapStr{
"name": "node",
},
},
"container": common.MapStr{
"image": common.MapStr{"name": "elastic/filebeat:6.3.0"},
"id": "",
"runtime": "",
},
},
"config": []*common.Config{},
},
Expand Down Expand Up @@ -976,13 +1006,19 @@ func TestEmitEvent(t *testing.T) {
"container": common.MapStr{
"name": "filebeat",
"image": "elastic/filebeat:6.3.0",
}, "pod": common.MapStr{
},
"pod": common.MapStr{
"name": "filebeat",
"uid": "005f3b90-4b9d-12f8-acf0-31020a840133",
}, "node": common.MapStr{
"name": "node",
},
},
"container": common.MapStr{
"image": common.MapStr{"name": "elastic/filebeat:6.3.0"},
"runtime": "",
"id": "",
},
},
"config": []*common.Config{},
},
Expand Down
31 changes: 26 additions & 5 deletions libbeat/processors/add_kubernetes_metadata/indexers.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,19 @@ func NewContainerIndexer(_ common.Config, metaGen metadata.MetaGen) (Indexer, er
func (c *ContainerIndexer) GetMetadata(pod *kubernetes.Pod) []MetadataIndex {
var m []MetadataIndex
for _, status := range append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) {
cID := kubernetes.ContainerID(status)
cID, runtime := kubernetes.ContainerIDWithRuntime(status)
if cID == "" {
continue
}
m = append(m, MetadataIndex{
Index: cID,
Data: c.metaGen.Generate(pod, metadata.WithFields("container.name", status.Name),
metadata.WithFields("container.image", status.Image)),
Data: c.metaGen.Generate(
pod,
metadata.WithFields("container.name", status.Name),
metadata.WithFields("container.image", status.Image),
metadata.WithFields("container.id", cID),
metadata.WithFields("container.runtime", runtime),
),
})
}

Expand Down Expand Up @@ -234,14 +239,30 @@ func (h *IPPortIndexer) GetMetadata(pod *kubernetes.Pod) []MetadataIndex {
Data: h.metaGen.Generate(pod),
})

cIDs := make(map[string]string)
runtimes := make(map[string]string)
for _, status := range append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) {
cID, runtime := kubernetes.ContainerIDWithRuntime(status)
if cID == "" {
continue
}
cIDs[status.Name] = cID
runtimes[status.Name] = runtime
}

for _, container := range pod.Spec.Containers {
for _, port := range container.Ports {
if port.ContainerPort != 0 {

m = append(m, MetadataIndex{
Index: fmt.Sprintf("%s:%d", pod.Status.PodIP, port.ContainerPort),
Data: h.metaGen.Generate(pod, metadata.WithFields("container.name", container.Name),
metadata.WithFields("container.image", container.Image)),
Data: h.metaGen.Generate(
pod,
metadata.WithFields("container.name", container.Name),
metadata.WithFields("container.image", container.Image),
metadata.WithFields("container.id", cIDs[container.Name]),
metadata.WithFields("container.runtime", runtimes[container.Name]),
),
})
}
}
Expand Down
24 changes: 18 additions & 6 deletions libbeat/processors/add_kubernetes_metadata/indexers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,14 +211,18 @@ func TestContainerIndexer(t *testing.T) {
assert.Equal(t, indices[1], "fghij")

expected["container"] = common.MapStr{
"name": container,
"image": containerImage,
"name": container,
"image": containerImage,
"id": "abcde",
"runtime": "docker",
}
assert.Equal(t, expected.String(), indexers[0].Data.String())

expected["container"] = common.MapStr{
"name": initContainer,
"image": initContainerImage,
"name": initContainer,
"image": initContainerImage,
"id": "fghij",
"runtime": "docker",
}
assert.Equal(t, expected.String(), indexers[1].Data.String())
}
Expand Down Expand Up @@ -372,7 +376,8 @@ func TestIpPortIndexer(t *testing.T) {
},

Status: v1.PodStatus{
PodIP: ip,
PodIP: ip,
ContainerStatuses: make([]kubernetes.PodContainerStatus, 0),
},
}

Expand Down Expand Up @@ -414,6 +419,13 @@ func TestIpPortIndexer(t *testing.T) {
},
},
}
pod.Status.ContainerStatuses = []kubernetes.PodContainerStatus{
{
Name: container,
Image: containerImage,
ContainerID: "docker://foobar",
},
}

nodeName := "testnode"
pod.Spec.NodeName = nodeName
Expand All @@ -429,6 +441,6 @@ func TestIpPortIndexer(t *testing.T) {
assert.Equal(t, fmt.Sprintf("%s:%d", ip, port), indices[1])

assert.Equal(t, expected.String(), indexers[0].Data.String())
expected["container"] = common.MapStr{"name": container, "image": containerImage}
expected["container"] = common.MapStr{"name": container, "image": containerImage, "id": "foobar", "runtime": "docker"}
assert.Equal(t, expected.String(), indexers[1].Data.String())
}
19 changes: 18 additions & 1 deletion libbeat/processors/add_kubernetes_metadata/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,25 @@ func (k *kubernetesAnnotator) Run(event *beat.Event) (*beat.Event, error) {
return event, nil
}

metaClone := metadata.Clone()
metaClone.Delete("container.name")
containerImage, err := metadata.GetValue("container.image")
if err == nil {
metaClone.Delete("container.image")
metaClone.Put("container.image.name", containerImage)
}
cmeta, err := metaClone.Clone().GetValue("container")
if err == nil {
event.Fields.DeepUpdate(common.MapStr{
"container": cmeta,
})
}

kubeMeta := metadata.Clone()
kubeMeta.Delete("container.id")
kubeMeta.Delete("container.runtime")
event.Fields.DeepUpdate(common.MapStr{
"kubernetes": metadata.Clone(),
"kubernetes": kubeMeta,
})

return event, nil
Expand Down

0 comments on commit bcb4e0c

Please sign in to comment.