diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 822a62decdeb..61210cf2fa01 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -165,6 +165,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix TLS certificate DoS vulnerability. {pull}10302[10302] - Fix panic and file unlock in spool on atomic operation (arm, x86-32). File lock was not released when panic occurs, leading to the beat deadlocking on startup. {pull}10289[10289] - Fix encoding of timestamps when using disk spool. {issue}10099[10099] +- Fix stopping of modules started by kubernetes autodiscover. {pull}10476[10476] - Fix a issue when remote and local configuration didn't match when fetching configuration from Central Management. {issue}10587[10587] *Auditbeat* diff --git a/libbeat/autodiscover/providers/kubernetes/kubernetes.go b/libbeat/autodiscover/providers/kubernetes/kubernetes.go index 758f083f20ab..c3d9a03dda48 100644 --- a/libbeat/autodiscover/providers/kubernetes/kubernetes.go +++ b/libbeat/autodiscover/providers/kubernetes/kubernetes.go @@ -18,6 +18,7 @@ package kubernetes import ( + "fmt" "time" "github.com/gofrs/uuid" @@ -144,12 +145,16 @@ func (p *Provider) emitEvents(pod *kubernetes.Pod, flag string, containers []*ku containerstatuses []*kubernetes.PodContainerStatus) { host := pod.Status.GetPodIP() - // Do not emit events without host (container is still being configured) - if host == "" { + // If the container doesn't exist in the runtime or its network + // is not configured, it won't have an IP. Skip it as we cannot + // generate configs without host, and an update will arrive when + // the container is ready. + // If stopping, emit the event in any case to ensure cleanup. + if host == "" && flag != "stop" { return } - // Collect all container IDs and runtimes from status information. + // Collect all runtimes from status information. containerIDs := map[string]string{} runtimes := map[string]string{} for _, c := range containerstatuses { @@ -160,13 +165,18 @@ func (p *Provider) emitEvents(pod *kubernetes.Pod, flag string, containers []*ku // Emit container and port information for _, c := range containers { + // If it doesn't have an ID, container doesn't exist in + // the runtime, emit only an event if we are stopping, so + // we are sure of cleaning up configurations. cid := containerIDs[c.GetName()] - - // If there is a container ID that is empty then ignore it. It either means that the container is still starting - // up or the container is shutting down. - if cid == "" { + if cid == "" && flag != "stop" { continue } + + // This must be an id that doesn't depend on the state of the container + // so it works also on `stop` if containers have been already deleted. + eventID := fmt.Sprintf("%s.%s", pod.Metadata.GetUid(), c.GetName()) + cmeta := common.MapStr{ "id": cid, "name": c.GetName(), @@ -190,7 +200,7 @@ func (p *Provider) emitEvents(pod *kubernetes.Pod, flag string, containers []*ku if len(c.Ports) == 0 { event := bus.Event{ "provider": p.uuid, - "id": cid, + "id": eventID, flag: true, "host": host, "kubernetes": kubemeta, @@ -204,7 +214,7 @@ func (p *Provider) emitEvents(pod *kubernetes.Pod, flag string, containers []*ku for _, port := range c.Ports { event := bus.Event{ "provider": p.uuid, - "id": cid, + "id": eventID, flag: true, "host": host, "port": port.GetContainerPort(), diff --git a/libbeat/autodiscover/providers/kubernetes/kubernetes_test.go b/libbeat/autodiscover/providers/kubernetes/kubernetes_test.go index e6f8019dfb1c..38ba1ab1c019 100644 --- a/libbeat/autodiscover/providers/kubernetes/kubernetes_test.go +++ b/libbeat/autodiscover/providers/kubernetes/kubernetes_test.go @@ -160,6 +160,7 @@ func TestEmitEvent(t *testing.T) { uid := "005f3b90-4b9d-12f8-acf0-31020a840133" containerImage := "elastic/filebeat:6.3.0" node := "node" + cid := "005f3b90-4b9d-12f8-acf0-31020a840133.filebeat" UUID, err := uuid.NewV4() if err != nil { t.Fatal(err) @@ -204,7 +205,7 @@ func TestEmitEvent(t *testing.T) { Expected: bus.Event{ "start": true, "host": "127.0.0.1", - "id": "foobar", + "id": cid, "provider": UUID, "kubernetes": common.MapStr{ "container": common.MapStr{ @@ -270,6 +271,170 @@ func TestEmitEvent(t *testing.T) { }, Expected: nil, }, + { + Message: "Test pod without container id", + Flag: "start", + Pod: &v1.Pod{ + Metadata: &metav1.ObjectMeta{ + Name: &name, + Uid: &uid, + Namespace: &namespace, + Labels: map[string]string{}, + Annotations: map[string]string{}, + }, + Status: &v1.PodStatus{ + PodIP: &podIP, + ContainerStatuses: []*kubernetes.PodContainerStatus{ + { + Name: &name, + }, + }, + }, + Spec: &v1.PodSpec{ + NodeName: &node, + Containers: []*kubernetes.Container{ + { + Image: &containerImage, + Name: &name, + }, + }, + }, + }, + Expected: nil, + }, + { + Message: "Test stop pod without host", + Flag: "stop", + Pod: &v1.Pod{ + Metadata: &metav1.ObjectMeta{ + Name: &name, + Uid: &uid, + Namespace: &namespace, + Labels: map[string]string{}, + Annotations: map[string]string{}, + }, + Status: &v1.PodStatus{ + ContainerStatuses: []*kubernetes.PodContainerStatus{ + { + Name: &name, + }, + }, + }, + Spec: &v1.PodSpec{ + NodeName: &node, + Containers: []*kubernetes.Container{ + { + Image: &containerImage, + Name: &name, + }, + }, + }, + }, + Expected: bus.Event{ + "stop": true, + "host": "", + "id": cid, + "provider": UUID, + "kubernetes": common.MapStr{ + "container": common.MapStr{ + "id": "", + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + "runtime": "", + }, + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "container": common.MapStr{ + "name": "filebeat", + }, "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, "node": common.MapStr{ + "name": "node", + }, + }, + }, + "config": []*common.Config{}, + }, + }, + { + Message: "Test stop pod without container id", + Flag: "stop", + Pod: &v1.Pod{ + Metadata: &metav1.ObjectMeta{ + Name: &name, + Uid: &uid, + Namespace: &namespace, + Labels: map[string]string{}, + Annotations: map[string]string{}, + }, + Status: &v1.PodStatus{ + PodIP: &podIP, + ContainerStatuses: []*kubernetes.PodContainerStatus{ + { + Name: &name, + }, + }, + }, + Spec: &v1.PodSpec{ + NodeName: &node, + Containers: []*kubernetes.Container{ + { + Image: &containerImage, + Name: &name, + }, + }, + }, + }, + Expected: bus.Event{ + "stop": true, + "host": "127.0.0.1", + "id": cid, + "provider": UUID, + "kubernetes": common.MapStr{ + "container": common.MapStr{ + "id": "", + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + "runtime": "", + }, + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "container": common.MapStr{ + "name": "filebeat", + }, "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, "node": common.MapStr{ + "name": "node", + }, + }, + }, + "config": []*common.Config{}, + }, + }, } for _, test := range tests {