Skip to content

Commit

Permalink
libbeat: Add a new way to detect node for k8s which using machine-id (#…
Browse files Browse the repository at this point in the history
…6146)

libbeat: Add a new way to detect node for k8s which using machine-id
  • Loading branch information
walktall authored and exekias committed Jan 23, 2018
1 parent 9039cf7 commit 9930cb7
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
- Add autodiscover for kubernetes. {pull}6055[6055]
- Add the abilility for the add_docker_metadata process to enrich based on process ID. {pull}6100[6100]
- The `add_docker_metadata` and `add_kubernetes_metadata` processors are now GA, instead of Beta. {pull}6105[6105]
- The node name can be discovered automatically by machine-id matching when beat deployed outside kubernetes cluster. {pull}6146[6146]

*Auditbeat*

Expand Down
2 changes: 1 addition & 1 deletion libbeat/autodiscover/providers/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func AutodiscoverBuilder(bus bus.Bus, c *common.Config) (autodiscover.Provider,

metagen := kubernetes.NewMetaGenerator(config.IncludeAnnotations, config.IncludeLabels, config.ExcludeLabels)

config.Host = kubernetes.DiscoverKubernetesNode(config.Host, client)
config.Host = kubernetes.DiscoverKubernetesNode(config.Host, config.InCluster, client)
watcher := kubernetes.NewWatcher(client.CoreV1(), config.SyncPeriod, config.CleanupTimeout, config.Host)

start := watcher.ListenStart()
Expand Down
95 changes: 75 additions & 20 deletions libbeat/common/kubernetes/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ import (
"fmt"
"io/ioutil"
"os"
"strings"

"github.com/ericchiang/k8s"
"github.com/ghodss/yaml"

"github.com/elastic/beats/libbeat/logp"
)

const defaultNode = "localhost"

// GetKubernetesClient returns a kubernetes client. If inCluster is true, it returns an
// in cluster configuration based on the secrets mounted in the Pod. If kubeConfig is passed,
// it parses the config file to get the config required to build a client.
Expand Down Expand Up @@ -41,28 +44,80 @@ func GetKubernetesClient(inCluster bool, kubeConfig string) (client *k8s.Client,
return client, nil
}

// DiscoverKubernetesNode figures out the Kubernetes host to use. If host is provided in the config
// use it directly. Else use hostname of the pod which is the Pod ID to query the Pod and get the Node
// name from the specification. Else, return localhost as a default.
func DiscoverKubernetesNode(host string, client *k8s.Client) string {
ctx := context.Background()
if host == "" {
podName := os.Getenv("HOSTNAME")
logp.Info("Using pod name %s and namespace %s", podName, client.Namespace)
if podName == "localhost" {
host = "localhost"
} else {
pod, err := client.CoreV1().GetPod(ctx, podName, client.Namespace)
if err != nil {
logp.Err("Querying for pod failed with error: ", err.Error())
logp.Info("Unable to find pod, setting host to localhost")
host = "localhost"
} else {
host = pod.Spec.GetNodeName()
}
// DiscoverKubernetesNode figures out the Kubernetes node to use.
// If host is provided in the config use it directly.
// If beat is deployed in k8s cluster, use hostname of pod which is pod name to query pod meta for node name.
// If beat is deployed outside k8s cluster, use machine-id to match against k8s nodes for node name.
func DiscoverKubernetesNode(host string, inCluster bool, client *k8s.Client) (node string) {
if host != "" {
logp.Info("kubernetes: Using node %s provided in the config", host)
return host
}

if inCluster {
ns, err := inClusterNamespace()
if err != nil {
logp.Err("kubernetes: Couldn't get namespace when beat is in cluster with error: ", err.Error())
return defaultNode
}
podName, err := os.Hostname()
if err != nil {
logp.Err("kubernetes: Couldn't get hostname as beat pod name in cluster with error: ", err.Error())
return defaultNode
}
logp.Info("kubernetes: Using pod name %s and namespace %s to discover kubernetes node", podName, ns)
pod, err := client.CoreV1().GetPod(context.TODO(), podName, ns)
if err != nil {
logp.Err("kubernetes: Querying for pod failed with error: ", err.Error())
return defaultNode
}
logp.Info("kubernetes: Using node %s discovered by in cluster pod node query", pod.Spec.GetNodeName())
return pod.Spec.GetNodeName()
}

mid := machineID()
if mid == "" {
logp.Err("kubernetes: Couldn't collect info from any of the files in /etc/machine-id /var/lib/dbus/machine-id")
return defaultNode
}

nodes, err := client.CoreV1().ListNodes(context.TODO())
if err != nil {
logp.Err("kubernetes: Querying for nodes failed with error: ", err.Error())
return defaultNode
}
for _, n := range nodes.Items {
if n.GetStatus().GetNodeInfo().GetMachineID() == mid {
logp.Info("kubernetes: Using node %s discovered by machine-id matching", n.GetMetadata().GetName())
return n.GetMetadata().GetName()
}
}

logp.Warn("kubernetes: Couldn't discover node, using localhost as default")
return defaultNode
}

// machineID borrowed from cadvisor.
func machineID() string {
for _, file := range []string{
"/etc/machine-id",
"/var/lib/dbus/machine-id",
} {
id, err := ioutil.ReadFile(file)
if err == nil {
return strings.TrimSpace(string(id))
}
}
return ""
}

return host
// inClusterNamespace gets namespace from serviceaccount when beat is in cluster.
// code borrowed from client-go with some changes.
func inClusterNamespace() (string, error) {
// get namespace associated with the service account token, if available
data, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
if err != nil {
return "", err
}
return strings.TrimSpace(string(data)), nil
}
2 changes: 1 addition & 1 deletion libbeat/processors/add_kubernetes_metadata/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func newKubernetesAnnotator(cfg *common.Config) (processors.Processor, error) {
return nil, err
}

config.Host = kubernetes.DiscoverKubernetesNode(config.Host, client)
config.Host = kubernetes.DiscoverKubernetesNode(config.Host, config.InCluster, client)

logp.Debug("kubernetes", "Using host ", config.Host)
logp.Debug("kubernetes", "Initializing watcher")
Expand Down

0 comments on commit 9930cb7

Please sign in to comment.