Skip to content

Commit

Permalink
Fix stopping of modules started by kubernetes autodiscover (elastic#1…
Browse files Browse the repository at this point in the history
…0476)

Kubernetes autodiscover only emits events for containers with
an ID in pods with an IP, but when a pod is being terminated,
their containers can lack of ID and the pod itself can lack of IP.
This leads to modules that are never stopped because the
delete event that should stop them lacks of the needed
information.

This change makes two things to avoid this problem:
    * Don't require the pod to have an IP on stop events.
    * Use IDs for containers that don't depend on its state.

(cherry picked from commit 15f2f26)
  • Loading branch information
jsoriano committed Feb 7, 2019
1 parent 3cd876c commit f45e3c4
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 10 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ https://github.com/elastic/beats/compare/1035569addc4a3b29ffa14f8a08c27c1ace16ef

*Affecting all Beats*

- Fix stopping of modules started by kubernetes autodiscover. {pull}10476[10476]

*Auditbeat*

- Enable System module config on Windows. {pull}10237[10237]
Expand Down
28 changes: 19 additions & 9 deletions libbeat/autodiscover/providers/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package kubernetes

import (
"fmt"
"time"

"github.com/gofrs/uuid"
Expand Down Expand Up @@ -144,12 +145,16 @@ func (p *Provider) emitEvents(pod *kubernetes.Pod, flag string, containers []*ku
containerstatuses []*kubernetes.PodContainerStatus) {
host := pod.Status.GetPodIP()

// Do not emit events without host (container is still being configured)
if host == "" {
// If the container doesn't exist in the runtime or its network
// is not configured, it won't have an IP. Skip it as we cannot
// generate configs without host, and an update will arrive when
// the container is ready.
// If stopping, emit the event in any case to ensure cleanup.
if host == "" && flag != "stop" {
return
}

// Collect all container IDs and runtimes from status information.
// Collect all runtimes from status information.
containerIDs := map[string]string{}
runtimes := map[string]string{}
for _, c := range containerstatuses {
Expand All @@ -160,13 +165,18 @@ func (p *Provider) emitEvents(pod *kubernetes.Pod, flag string, containers []*ku

// Emit container and port information
for _, c := range containers {
// If it doesn't have an ID, container doesn't exist in
// the runtime, emit only an event if we are stopping, so
// we are sure of cleaning up configurations.
cid := containerIDs[c.GetName()]

// If there is a container ID that is empty then ignore it. It either means that the container is still starting
// up or the container is shutting down.
if cid == "" {
if cid == "" && flag != "stop" {
continue
}

// This must be an id that doesn't depend on the state of the container
// so it works also on `stop` if containers have been already deleted.
eventID := fmt.Sprintf("%s.%s", pod.Metadata.GetUid(), c.GetName())

cmeta := common.MapStr{
"id": cid,
"name": c.GetName(),
Expand All @@ -190,7 +200,7 @@ func (p *Provider) emitEvents(pod *kubernetes.Pod, flag string, containers []*ku
if len(c.Ports) == 0 {
event := bus.Event{
"provider": p.uuid,
"id": cid,
"id": eventID,
flag: true,
"host": host,
"kubernetes": kubemeta,
Expand All @@ -204,7 +214,7 @@ func (p *Provider) emitEvents(pod *kubernetes.Pod, flag string, containers []*ku
for _, port := range c.Ports {
event := bus.Event{
"provider": p.uuid,
"id": cid,
"id": eventID,
flag: true,
"host": host,
"port": port.GetContainerPort(),
Expand Down
167 changes: 166 additions & 1 deletion libbeat/autodiscover/providers/kubernetes/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func TestEmitEvent(t *testing.T) {
uid := "005f3b90-4b9d-12f8-acf0-31020a840133"
containerImage := "elastic/filebeat:6.3.0"
node := "node"
cid := "005f3b90-4b9d-12f8-acf0-31020a840133.filebeat"
UUID, err := uuid.NewV4()
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -204,7 +205,7 @@ func TestEmitEvent(t *testing.T) {
Expected: bus.Event{
"start": true,
"host": "127.0.0.1",
"id": "foobar",
"id": cid,
"provider": UUID,
"kubernetes": common.MapStr{
"container": common.MapStr{
Expand Down Expand Up @@ -270,6 +271,170 @@ func TestEmitEvent(t *testing.T) {
},
Expected: nil,
},
{
Message: "Test pod without container id",
Flag: "start",
Pod: &v1.Pod{
Metadata: &metav1.ObjectMeta{
Name: &name,
Uid: &uid,
Namespace: &namespace,
Labels: map[string]string{},
Annotations: map[string]string{},
},
Status: &v1.PodStatus{
PodIP: &podIP,
ContainerStatuses: []*kubernetes.PodContainerStatus{
{
Name: &name,
},
},
},
Spec: &v1.PodSpec{
NodeName: &node,
Containers: []*kubernetes.Container{
{
Image: &containerImage,
Name: &name,
},
},
},
},
Expected: nil,
},
{
Message: "Test stop pod without host",
Flag: "stop",
Pod: &v1.Pod{
Metadata: &metav1.ObjectMeta{
Name: &name,
Uid: &uid,
Namespace: &namespace,
Labels: map[string]string{},
Annotations: map[string]string{},
},
Status: &v1.PodStatus{
ContainerStatuses: []*kubernetes.PodContainerStatus{
{
Name: &name,
},
},
},
Spec: &v1.PodSpec{
NodeName: &node,
Containers: []*kubernetes.Container{
{
Image: &containerImage,
Name: &name,
},
},
},
},
Expected: bus.Event{
"stop": true,
"host": "",
"id": cid,
"provider": UUID,
"kubernetes": common.MapStr{
"container": common.MapStr{
"id": "",
"name": "filebeat",
"image": "elastic/filebeat:6.3.0",
"runtime": "",
},
"pod": common.MapStr{
"name": "filebeat",
"uid": "005f3b90-4b9d-12f8-acf0-31020a840133",
},
"node": common.MapStr{
"name": "node",
},
"namespace": "default",
"annotations": common.MapStr{},
},
"meta": common.MapStr{
"kubernetes": common.MapStr{
"namespace": "default",
"container": common.MapStr{
"name": "filebeat",
}, "pod": common.MapStr{
"name": "filebeat",
"uid": "005f3b90-4b9d-12f8-acf0-31020a840133",
}, "node": common.MapStr{
"name": "node",
},
},
},
"config": []*common.Config{},
},
},
{
Message: "Test stop pod without container id",
Flag: "stop",
Pod: &v1.Pod{
Metadata: &metav1.ObjectMeta{
Name: &name,
Uid: &uid,
Namespace: &namespace,
Labels: map[string]string{},
Annotations: map[string]string{},
},
Status: &v1.PodStatus{
PodIP: &podIP,
ContainerStatuses: []*kubernetes.PodContainerStatus{
{
Name: &name,
},
},
},
Spec: &v1.PodSpec{
NodeName: &node,
Containers: []*kubernetes.Container{
{
Image: &containerImage,
Name: &name,
},
},
},
},
Expected: bus.Event{
"stop": true,
"host": "127.0.0.1",
"id": cid,
"provider": UUID,
"kubernetes": common.MapStr{
"container": common.MapStr{
"id": "",
"name": "filebeat",
"image": "elastic/filebeat:6.3.0",
"runtime": "",
},
"pod": common.MapStr{
"name": "filebeat",
"uid": "005f3b90-4b9d-12f8-acf0-31020a840133",
},
"node": common.MapStr{
"name": "node",
},
"namespace": "default",
"annotations": common.MapStr{},
},
"meta": common.MapStr{
"kubernetes": common.MapStr{
"namespace": "default",
"container": common.MapStr{
"name": "filebeat",
}, "pod": common.MapStr{
"name": "filebeat",
"uid": "005f3b90-4b9d-12f8-acf0-31020a840133",
}, "node": common.MapStr{
"name": "node",
},
},
},
"config": []*common.Config{},
},
},
}

for _, test := range tests {
Expand Down

0 comments on commit f45e3c4

Please sign in to comment.