From 9930cb777f929ab2de9d9722bdc54c269d1428f6 Mon Sep 17 00:00:00 2001 From: Zhiheng Huang Date: Wed, 24 Jan 2018 00:59:43 +0800 Subject: [PATCH] libbeat: Add a new way to detect node for k8s which using machine-id (#6146) libbeat: Add a new way to detect node for k8s which using machine-id --- CHANGELOG.asciidoc | 1 + .../providers/kubernetes/kubernetes.go | 2 +- libbeat/common/kubernetes/util.go | 95 +++++++++++++++---- .../add_kubernetes_metadata/kubernetes.go | 2 +- 4 files changed, 78 insertions(+), 22 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index ce987f03a50e..29b63b666df3 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -142,6 +142,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di - Add autodiscover for kubernetes. {pull}6055[6055] - Add the abilility for the add_docker_metadata process to enrich based on process ID. {pull}6100[6100] - The `add_docker_metadata` and `add_kubernetes_metadata` processors are now GA, instead of Beta. {pull}6105[6105] +- The node name can be discovered automatically by machine-id matching when beat deployed outside kubernetes cluster. {pull}6146[6146] *Auditbeat* diff --git a/libbeat/autodiscover/providers/kubernetes/kubernetes.go b/libbeat/autodiscover/providers/kubernetes/kubernetes.go index 646638402cfa..eedd7462fe59 100644 --- a/libbeat/autodiscover/providers/kubernetes/kubernetes.go +++ b/libbeat/autodiscover/providers/kubernetes/kubernetes.go @@ -46,7 +46,7 @@ func AutodiscoverBuilder(bus bus.Bus, c *common.Config) (autodiscover.Provider, metagen := kubernetes.NewMetaGenerator(config.IncludeAnnotations, config.IncludeLabels, config.ExcludeLabels) - config.Host = kubernetes.DiscoverKubernetesNode(config.Host, client) + config.Host = kubernetes.DiscoverKubernetesNode(config.Host, config.InCluster, client) watcher := kubernetes.NewWatcher(client.CoreV1(), config.SyncPeriod, config.CleanupTimeout, config.Host) start := watcher.ListenStart() diff --git a/libbeat/common/kubernetes/util.go b/libbeat/common/kubernetes/util.go index 88c1c37048af..4219d543ad2f 100644 --- a/libbeat/common/kubernetes/util.go +++ b/libbeat/common/kubernetes/util.go @@ -5,6 +5,7 @@ import ( "fmt" "io/ioutil" "os" + "strings" "github.com/ericchiang/k8s" "github.com/ghodss/yaml" @@ -12,6 +13,8 @@ import ( "github.com/elastic/beats/libbeat/logp" ) +const defaultNode = "localhost" + // GetKubernetesClient returns a kubernetes client. If inCluster is true, it returns an // in cluster configuration based on the secrets mounted in the Pod. If kubeConfig is passed, // it parses the config file to get the config required to build a client. @@ -41,28 +44,80 @@ func GetKubernetesClient(inCluster bool, kubeConfig string) (client *k8s.Client, return client, nil } -// DiscoverKubernetesNode figures out the Kubernetes host to use. If host is provided in the config -// use it directly. Else use hostname of the pod which is the Pod ID to query the Pod and get the Node -// name from the specification. Else, return localhost as a default. -func DiscoverKubernetesNode(host string, client *k8s.Client) string { - ctx := context.Background() - if host == "" { - podName := os.Getenv("HOSTNAME") - logp.Info("Using pod name %s and namespace %s", podName, client.Namespace) - if podName == "localhost" { - host = "localhost" - } else { - pod, err := client.CoreV1().GetPod(ctx, podName, client.Namespace) - if err != nil { - logp.Err("Querying for pod failed with error: ", err.Error()) - logp.Info("Unable to find pod, setting host to localhost") - host = "localhost" - } else { - host = pod.Spec.GetNodeName() - } +// DiscoverKubernetesNode figures out the Kubernetes node to use. +// If host is provided in the config use it directly. +// If beat is deployed in k8s cluster, use hostname of pod which is pod name to query pod meta for node name. +// If beat is deployed outside k8s cluster, use machine-id to match against k8s nodes for node name. +func DiscoverKubernetesNode(host string, inCluster bool, client *k8s.Client) (node string) { + if host != "" { + logp.Info("kubernetes: Using node %s provided in the config", host) + return host + } + + if inCluster { + ns, err := inClusterNamespace() + if err != nil { + logp.Err("kubernetes: Couldn't get namespace when beat is in cluster with error: ", err.Error()) + return defaultNode + } + podName, err := os.Hostname() + if err != nil { + logp.Err("kubernetes: Couldn't get hostname as beat pod name in cluster with error: ", err.Error()) + return defaultNode + } + logp.Info("kubernetes: Using pod name %s and namespace %s to discover kubernetes node", podName, ns) + pod, err := client.CoreV1().GetPod(context.TODO(), podName, ns) + if err != nil { + logp.Err("kubernetes: Querying for pod failed with error: ", err.Error()) + return defaultNode + } + logp.Info("kubernetes: Using node %s discovered by in cluster pod node query", pod.Spec.GetNodeName()) + return pod.Spec.GetNodeName() + } + + mid := machineID() + if mid == "" { + logp.Err("kubernetes: Couldn't collect info from any of the files in /etc/machine-id /var/lib/dbus/machine-id") + return defaultNode + } + + nodes, err := client.CoreV1().ListNodes(context.TODO()) + if err != nil { + logp.Err("kubernetes: Querying for nodes failed with error: ", err.Error()) + return defaultNode + } + for _, n := range nodes.Items { + if n.GetStatus().GetNodeInfo().GetMachineID() == mid { + logp.Info("kubernetes: Using node %s discovered by machine-id matching", n.GetMetadata().GetName()) + return n.GetMetadata().GetName() + } + } + + logp.Warn("kubernetes: Couldn't discover node, using localhost as default") + return defaultNode +} +// machineID borrowed from cadvisor. +func machineID() string { + for _, file := range []string{ + "/etc/machine-id", + "/var/lib/dbus/machine-id", + } { + id, err := ioutil.ReadFile(file) + if err == nil { + return strings.TrimSpace(string(id)) } } + return "" +} - return host +// inClusterNamespace gets namespace from serviceaccount when beat is in cluster. +// code borrowed from client-go with some changes. +func inClusterNamespace() (string, error) { + // get namespace associated with the service account token, if available + data, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace") + if err != nil { + return "", err + } + return strings.TrimSpace(string(data)), nil } diff --git a/libbeat/processors/add_kubernetes_metadata/kubernetes.go b/libbeat/processors/add_kubernetes_metadata/kubernetes.go index bf7e49c9fa85..7b0bf5cbbdee 100644 --- a/libbeat/processors/add_kubernetes_metadata/kubernetes.go +++ b/libbeat/processors/add_kubernetes_metadata/kubernetes.go @@ -89,7 +89,7 @@ func newKubernetesAnnotator(cfg *common.Config) (processors.Processor, error) { return nil, err } - config.Host = kubernetes.DiscoverKubernetesNode(config.Host, client) + config.Host = kubernetes.DiscoverKubernetesNode(config.Host, config.InCluster, client) logp.Debug("kubernetes", "Using host ", config.Host) logp.Debug("kubernetes", "Initializing watcher")