From ee0a38dfaf7f608234cef57871716a65ac4fc432 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Mon, 11 Feb 2019 10:09:44 +0100 Subject: [PATCH] Fix stopping of modules started by kubernetes autodiscover (#10476) (#10642) Kubernetes autodiscover only emits events for containers with an ID in pods with an IP, but when a pod is being terminated, their containers can lack of ID and the pod itself can lack of IP. This leads to modules that are never stopped because the delete event that should stop them lacks of the needed information. This change makes two things to avoid this problem: * Don't require the pod to have an IP on stop events. * Use IDs for containers that don't depend on its state. (cherry picked from commit 15f2f263f885e717c9a52866b0a0e97ebcfcf07b) --- CHANGELOG.next.asciidoc | 1 + .../providers/kubernetes/kubernetes.go | 28 ++- .../providers/kubernetes/kubernetes_test.go | 167 +++++++++++++++++- 3 files changed, 186 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 822a62decdeb..61210cf2fa01 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -165,6 +165,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix TLS certificate DoS vulnerability. {pull}10302[10302] - Fix panic and file unlock in spool on atomic operation (arm, x86-32). File lock was not released when panic occurs, leading to the beat deadlocking on startup. {pull}10289[10289] - Fix encoding of timestamps when using disk spool. {issue}10099[10099] +- Fix stopping of modules started by kubernetes autodiscover. {pull}10476[10476] - Fix a issue when remote and local configuration didn't match when fetching configuration from Central Management. {issue}10587[10587] *Auditbeat* diff --git a/libbeat/autodiscover/providers/kubernetes/kubernetes.go b/libbeat/autodiscover/providers/kubernetes/kubernetes.go index 758f083f20ab..c3d9a03dda48 100644 --- a/libbeat/autodiscover/providers/kubernetes/kubernetes.go +++ b/libbeat/autodiscover/providers/kubernetes/kubernetes.go @@ -18,6 +18,7 @@ package kubernetes import ( + "fmt" "time" "github.com/gofrs/uuid" @@ -144,12 +145,16 @@ func (p *Provider) emitEvents(pod *kubernetes.Pod, flag string, containers []*ku containerstatuses []*kubernetes.PodContainerStatus) { host := pod.Status.GetPodIP() - // Do not emit events without host (container is still being configured) - if host == "" { + // If the container doesn't exist in the runtime or its network + // is not configured, it won't have an IP. Skip it as we cannot + // generate configs without host, and an update will arrive when + // the container is ready. + // If stopping, emit the event in any case to ensure cleanup. + if host == "" && flag != "stop" { return } - // Collect all container IDs and runtimes from status information. + // Collect all runtimes from status information. containerIDs := map[string]string{} runtimes := map[string]string{} for _, c := range containerstatuses { @@ -160,13 +165,18 @@ func (p *Provider) emitEvents(pod *kubernetes.Pod, flag string, containers []*ku // Emit container and port information for _, c := range containers { + // If it doesn't have an ID, container doesn't exist in + // the runtime, emit only an event if we are stopping, so + // we are sure of cleaning up configurations. cid := containerIDs[c.GetName()] - - // If there is a container ID that is empty then ignore it. It either means that the container is still starting - // up or the container is shutting down. - if cid == "" { + if cid == "" && flag != "stop" { continue } + + // This must be an id that doesn't depend on the state of the container + // so it works also on `stop` if containers have been already deleted. + eventID := fmt.Sprintf("%s.%s", pod.Metadata.GetUid(), c.GetName()) + cmeta := common.MapStr{ "id": cid, "name": c.GetName(), @@ -190,7 +200,7 @@ func (p *Provider) emitEvents(pod *kubernetes.Pod, flag string, containers []*ku if len(c.Ports) == 0 { event := bus.Event{ "provider": p.uuid, - "id": cid, + "id": eventID, flag: true, "host": host, "kubernetes": kubemeta, @@ -204,7 +214,7 @@ func (p *Provider) emitEvents(pod *kubernetes.Pod, flag string, containers []*ku for _, port := range c.Ports { event := bus.Event{ "provider": p.uuid, - "id": cid, + "id": eventID, flag: true, "host": host, "port": port.GetContainerPort(), diff --git a/libbeat/autodiscover/providers/kubernetes/kubernetes_test.go b/libbeat/autodiscover/providers/kubernetes/kubernetes_test.go index e6f8019dfb1c..38ba1ab1c019 100644 --- a/libbeat/autodiscover/providers/kubernetes/kubernetes_test.go +++ b/libbeat/autodiscover/providers/kubernetes/kubernetes_test.go @@ -160,6 +160,7 @@ func TestEmitEvent(t *testing.T) { uid := "005f3b90-4b9d-12f8-acf0-31020a840133" containerImage := "elastic/filebeat:6.3.0" node := "node" + cid := "005f3b90-4b9d-12f8-acf0-31020a840133.filebeat" UUID, err := uuid.NewV4() if err != nil { t.Fatal(err) @@ -204,7 +205,7 @@ func TestEmitEvent(t *testing.T) { Expected: bus.Event{ "start": true, "host": "127.0.0.1", - "id": "foobar", + "id": cid, "provider": UUID, "kubernetes": common.MapStr{ "container": common.MapStr{ @@ -270,6 +271,170 @@ func TestEmitEvent(t *testing.T) { }, Expected: nil, }, + { + Message: "Test pod without container id", + Flag: "start", + Pod: &v1.Pod{ + Metadata: &metav1.ObjectMeta{ + Name: &name, + Uid: &uid, + Namespace: &namespace, + Labels: map[string]string{}, + Annotations: map[string]string{}, + }, + Status: &v1.PodStatus{ + PodIP: &podIP, + ContainerStatuses: []*kubernetes.PodContainerStatus{ + { + Name: &name, + }, + }, + }, + Spec: &v1.PodSpec{ + NodeName: &node, + Containers: []*kubernetes.Container{ + { + Image: &containerImage, + Name: &name, + }, + }, + }, + }, + Expected: nil, + }, + { + Message: "Test stop pod without host", + Flag: "stop", + Pod: &v1.Pod{ + Metadata: &metav1.ObjectMeta{ + Name: &name, + Uid: &uid, + Namespace: &namespace, + Labels: map[string]string{}, + Annotations: map[string]string{}, + }, + Status: &v1.PodStatus{ + ContainerStatuses: []*kubernetes.PodContainerStatus{ + { + Name: &name, + }, + }, + }, + Spec: &v1.PodSpec{ + NodeName: &node, + Containers: []*kubernetes.Container{ + { + Image: &containerImage, + Name: &name, + }, + }, + }, + }, + Expected: bus.Event{ + "stop": true, + "host": "", + "id": cid, + "provider": UUID, + "kubernetes": common.MapStr{ + "container": common.MapStr{ + "id": "", + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + "runtime": "", + }, + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "container": common.MapStr{ + "name": "filebeat", + }, "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, "node": common.MapStr{ + "name": "node", + }, + }, + }, + "config": []*common.Config{}, + }, + }, + { + Message: "Test stop pod without container id", + Flag: "stop", + Pod: &v1.Pod{ + Metadata: &metav1.ObjectMeta{ + Name: &name, + Uid: &uid, + Namespace: &namespace, + Labels: map[string]string{}, + Annotations: map[string]string{}, + }, + Status: &v1.PodStatus{ + PodIP: &podIP, + ContainerStatuses: []*kubernetes.PodContainerStatus{ + { + Name: &name, + }, + }, + }, + Spec: &v1.PodSpec{ + NodeName: &node, + Containers: []*kubernetes.Container{ + { + Image: &containerImage, + Name: &name, + }, + }, + }, + }, + Expected: bus.Event{ + "stop": true, + "host": "127.0.0.1", + "id": cid, + "provider": UUID, + "kubernetes": common.MapStr{ + "container": common.MapStr{ + "id": "", + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + "runtime": "", + }, + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "container": common.MapStr{ + "name": "filebeat", + }, "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, "node": common.MapStr{ + "name": "node", + }, + }, + }, + "config": []*common.Config{}, + }, + }, } for _, test := range tests {