Skip to content

Commit

Permalink
[extension/observer/k8s] Add k8s.node discovery (#6820)
Browse files Browse the repository at this point in the history
These changes adopt the new k8s.node endpoint type in the k8s_observer
and provide new config fields `observe_nodes` and `observe_pods` to
toggle desired Endpoint type discovery. The existing `node` config
field is no longer mandatory and missing values will result in all
available nodes and pods in being reported.

They do not include adoption in the receiver creator, as that will be
addressed in a subsequent PR.
  • Loading branch information
rmfitzpatrick authored Dec 21, 2021
1 parent 36a284a commit ebea5bc
Show file tree
Hide file tree
Showing 24 changed files with 878 additions and 327 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ extension/httpforwarder/ @open-telemetry/collector-c
extension/oauth2clientauthextension/ @open-telemetry/collector-contrib-approvers @jpkrohling @pavankrish123
extension/observer/ @open-telemetry/collector-contrib-approvers @asuresh4 @jrcamp
extension/observer/ecstaskobserver/ @open-telemetry/collector-contrib-approvers @rmfitzpatrick
extension/observer/k8sobserver/ @open-telemetry/collector-contrib-approvers @rmfitzpatrick @dmitryax
extension/oidcauthextension/ @open-telemetry/collector-contrib-approvers @jpkrohling

internal/aws/ @open-telemetry/collector-contrib-approvers @anuraaga @mxiamxia
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- `prometheusremotewriteexporter`: Handling Staleness flag from OTLP (#6679)
- `mysqlreceiver`: Add Integration test (#6916)
- `datadogexporter`: Add compatibility with ECS Fargate semantic conventions (#6670)
- `k8s_observer`: discover k8s.node endpoints (#6820)

## 🛑 Breaking changes 🛑

Expand Down
12 changes: 0 additions & 12 deletions extension/observer/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,15 +229,6 @@ type K8sNode struct {
Labels map[string]string
// KubeletEndpointPort is the node status object's DaemonEndpoints.KubeletEndpoint.Port value
KubeletEndpointPort uint16
// Spec represents the node Spec object.
// It is a json object that is equivalent to the output of `kubectl get node <node> -o jsonpath='{.spec}'`
Spec map[string]interface{}
// Metadata represents the node ObjectMeta object.
// It is a json object that is equivalent to the output of `kubectl get node <node> -o jsonpath='{.metadata}'`
Metadata map[string]interface{}
// Status represents the node Status object.
// It is a json object that is equivalent to the output of `kubectl get node <node> -o jsonpath='{.status}'`
Status map[string]interface{}
}

func (n *K8sNode) Env() EndpointEnv {
Expand All @@ -246,9 +237,6 @@ func (n *K8sNode) Env() EndpointEnv {
"uid": n.UID,
"annotations": n.Annotations,
"labels": n.Labels,
"metadata": n.Metadata,
"spec": n.Spec,
"status": n.Status,
"hostname": n.Hostname,
"external_ip": n.ExternalIP,
"internal_ip": n.InternalIP,
Expand Down
18 changes: 0 additions & 18 deletions extension/observer/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,15 +180,6 @@ func TestEndpointEnv(t *testing.T) {
"label_key": "label_val",
},
KubeletEndpointPort: 1234,
Spec: map[string]interface{}{
"spec": "spec_val",
},
Metadata: map[string]interface{}{
"metadata": "metadata_val",
},
Status: map[string]interface{}{
"status": "status_val",
},
},
},
want: EndpointEnv{
Expand All @@ -208,15 +199,6 @@ func TestEndpointEnv(t *testing.T) {
"labels": map[string]string{
"label_key": "label_val",
},
"spec": map[string]interface{}{
"spec": "spec_val",
},
"metadata": map[string]interface{}{
"metadata": "metadata_val",
},
"status": map[string]interface{}{
"status": "status_val",
},
},
wantErr: false,
},
Expand Down
54 changes: 43 additions & 11 deletions extension/observer/k8sobserver/README.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,41 @@
# Kubernetes Observer

The k8sobserver uses the Kubernetes API to discover pods running on the local node. This assumes the collector is deployed in the "agent" model where it is running on each individual node/host instance.
The `k8s_observer` is a [Receiver Creator](../../../receiver/receivercreator/README.md)-compatible "watch observer" that will detect and report
Kubernetes pod, port, and node endpoints via the Kubernetes API.

## Config

**auth_type**
## Example Config

How to authenticate to the K8s API server. This can be one of `none` (for no auth), `serviceAccount` (to use the standard service account token provided to the agent pod), or `kubeConfig` to use credentials from `~/.kube/config`.

**node**
```yaml
extensions:
k8s_observer:
auth_type: serviceAccount
node: ${K8S_NODE_NAME}
observe_pods: true
observe_nodes: true

receivers:
receiver_creator:
watch_observers: [k8s_observer]
receivers:
redis:
rule: type == "port" && pod.name matches "redis"
config:
password: '`pod.labels["SECRET"]`'
kubeletstats:
rule: type == "k8s.node"
config:
auth_type: serviceAccount
collection_interval: 10s
endpoint: "`endpoint`:`kubelet_endpoint_port`"
extra_metadata_labels:
- container.id
metric_groups:
- container
- pod
- node
```
Node should be set to the node name to limit discovered endpoints to. For example, node name can be set using the downward API inside the collector pod spec as follows:
The `node` field can be set to the node name to limit discovered endpoints. For example, its name value can be obtained using the downward API inside a Collector pod spec as follows:

```yaml
env:
Expand All @@ -20,8 +45,15 @@ env:
fieldPath: spec.nodeName
```

Then set this value to `${K8S_NODE_NAME}` in the configuration.
This spec-determined value would then be available via the `${K8S_NODE_NAME}` usage in the observer configuration.

## Config

The full list of settings exposed for this exporter are documented [here](./config.go)
with detailed sample configurations [here](./testdata/config.yaml).
All fields are optional.

| Name | Type | Default | Docs |
| ---- | ---- | ------- | ---- |
| auth_type | string | `serviceAccount` | How to authenticate to the K8s API server. This can be one of `none` (for no auth), `serviceAccount` (to use the standard service account token provided to the agent pod), or `kubeConfig` to use credentials from `~/.kube/config`. |
| node | string | <no value> | The node name to limit the discovery of pod, port, and node endpoints. Providing no value (the default) results in discovering endpoints for all available nodes. |
| observe_pods | bool | `true` | Whether to report observer pod and port endpoints. If `true` and `node` is specified it will only discover pod and port endpoints whose `spec.nodeName` matches the provided node name. If `true` and `node` isn't specified, it will discover all available pod and port endpoints. Please note that Collector connectivity to pods from other nodes is dependent on your cluster configuration and isn't guaranteed. |
| observe_nodes | bool | `false` | Whether to report observer k8s.node endpoints. If `true` and `node` is specified it will only discover node endpoints whose `metadata.name` matches the provided node name. If `true` and `node` isn't specified, it will discover all available node endpoints. Please note that Collector connectivity to nodes is dependent on your cluster configuration and isn't guaranteed.|
19 changes: 17 additions & 2 deletions extension/observer/k8sobserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package k8sobserver // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/k8sobserver"

import (
"fmt"

"go.opentelemetry.io/collector/config"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
Expand All @@ -25,8 +27,10 @@ type Config struct {
config.ExtensionSettings `mapstructure:",squash"`
k8sconfig.APIConfig `mapstructure:",squash"`

// Node should be set to the node name to limit discovered endpoints to. For example, node name can
// be set using the downward API inside the collector pod spec as follows:
// Node is the node name to limit the discovery of pod, port, and node endpoints.
// Providing no value (the default) results in discovering endpoints for all available nodes.
// For example, node name can be set using the downward API inside the collector
// pod spec as follows:
//
// env:
// - name: K8S_NODE_NAME
Expand All @@ -36,9 +40,20 @@ type Config struct {
//
// Then set this value to ${K8S_NODE_NAME} in the configuration.
Node string `mapstructure:"node"`
// ObservePods determines whether to report observer pod and port endpoints. If `true` and Node is specified
// it will only discover pod and port endpoints whose `spec.nodeName` matches the provided node name. If `true` and
// Node isn't specified, it will discover all available pod and port endpoints. `true` by default.
ObservePods bool `mapstructure:"observe_pods"`
// ObserveNodes determines whether to report observer k8s.node endpoints. If `true` and Node is specified
// it will only discover node endpoints whose `metadata.name` matches the provided node name. If `true` and
// Node isn't specified, it will discover all available node endpoints. `false` by default.
ObserveNodes bool `mapstructure:"observe_nodes"`
}

// Validate checks if the extension configuration is valid
func (cfg *Config) Validate() error {
if !cfg.ObservePods && !cfg.ObserveNodes {
return fmt.Errorf("one of observe_pods and observe_nodes must be true")
}
return cfg.APIConfig.Validate()
}
56 changes: 38 additions & 18 deletions extension/observer/k8sobserver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,35 +35,55 @@ func TestLoadConfig(t *testing.T) {
factories.Extensions[typeStr] = factory
cfg, err := configtest.LoadConfigAndValidate(path.Join(".", "testdata", "config.yaml"), factories)

require.Nil(t, err)
require.NoError(t, err)
require.NotNil(t, cfg)

require.Len(t, cfg.Extensions, 2)
require.Len(t, cfg.Extensions, 3)

ext0 := cfg.Extensions[config.NewComponentID(typeStr)]
assert.EqualValues(t, factory.CreateDefaultConfig(), ext0)
defaultConfig := cfg.Extensions[config.NewComponentID(typeStr)]
assert.EqualValues(t, factory.CreateDefaultConfig(), defaultConfig)

ext1 := cfg.Extensions[config.NewComponentIDWithName(typeStr, "1")]
ownNodeOnly := cfg.Extensions[config.NewComponentIDWithName(typeStr, "own-node-only")]
assert.EqualValues(t,
&Config{
ExtensionSettings: config.NewExtensionSettings(config.NewComponentIDWithName(typeStr, "1")),
ExtensionSettings: config.NewExtensionSettings(config.NewComponentIDWithName(typeStr, "own-node-only")),
Node: "node-1",
APIConfig: k8sconfig.APIConfig{AuthType: k8sconfig.AuthTypeKubeConfig},
ObservePods: true,
},
ext1)
ownNodeOnly)

observeAll := cfg.Extensions[config.NewComponentIDWithName(typeStr, "observe-all")]
assert.EqualValues(t,
&Config{
ExtensionSettings: config.NewExtensionSettings(config.NewComponentIDWithName(typeStr, "observe-all")),
Node: "",
APIConfig: k8sconfig.APIConfig{AuthType: k8sconfig.AuthTypeNone},
ObservePods: true,
ObserveNodes: true,
},
observeAll)

}

func TestValidate(t *testing.T) {
cfg := &Config{
ExtensionSettings: config.NewExtensionSettings(config.NewComponentIDWithName(typeStr, "1")),
Node: "node-1",
APIConfig: k8sconfig.APIConfig{AuthType: k8sconfig.AuthTypeKubeConfig},
}
func TestInvalidAuth(t *testing.T) {
factories, err := componenttest.NopFactories()
assert.NoError(t, err)

err := cfg.Validate()
require.Nil(t, err)
factory := NewFactory()
factories.Extensions[typeStr] = factory
cfg, err := configtest.LoadConfigAndValidate(path.Join("testdata/invalid_auth.yaml"), factories)
require.NotNil(t, cfg)
require.EqualError(t, err, `extension "k8s_observer" has invalid configuration: invalid authType for kubernetes: not a real auth type`)
}

func TestInvalidNoObserving(t *testing.T) {
factories, err := componenttest.NopFactories()
assert.NoError(t, err)

cfg.APIConfig.AuthType = "invalid"
err = cfg.Validate()
require.NotNil(t, err)
factory := NewFactory()
factories.Extensions[typeStr] = factory
cfg, err := configtest.LoadConfigAndValidate(path.Join("testdata/invalid_no_observing.yaml"), factories)
require.NotNil(t, cfg)
require.EqualError(t, err, `extension "k8s_observer" has invalid configuration: one of observe_pods and observe_nodes must be true`)
}
85 changes: 72 additions & 13 deletions extension/observer/k8sobserver/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,98 @@ import (
"context"

"go.opentelemetry.io/collector/component"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/tools/cache"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
)

var _ component.Extension = (*k8sObserver)(nil)
var _ observer.Observable = (*k8sObserver)(nil)

type k8sObserver struct {
logger *zap.Logger
informer cache.SharedInformer
stop chan struct{}
config *Config
telemetry component.TelemetrySettings
podInformer cache.SharedInformer
podListerWatcher cache.ListerWatcher
nodeInformer cache.SharedInformer
nodeListerWatcher cache.ListerWatcher
stop chan struct{}
config *Config
}

// Start will populate the cache.SharedInformers for pods and nodes as configured and run them as goroutines.
func (k *k8sObserver) Start(ctx context.Context, host component.Host) error {
go k.informer.Run(k.stop)
if k.podListerWatcher != nil && k.podInformer == nil {
k.telemetry.Logger.Debug("creating and starting pod informer")
k.podInformer = cache.NewSharedInformer(k.podListerWatcher, &v1.Pod{}, 0)
go k.podInformer.Run(k.stop)
}
if k.nodeListerWatcher != nil && k.nodeInformer == nil {
k.telemetry.Logger.Debug("creating and starting node informer")
k.nodeInformer = cache.NewSharedInformer(k.nodeListerWatcher, &v1.Node{}, 0)
go k.nodeInformer.Run(k.stop)
}
return nil
}

// Shutdown tells any cache.SharedInformers to stop running.
func (k *k8sObserver) Shutdown(ctx context.Context) error {
close(k.stop)
return nil
}

var _ (component.Extension) = (*k8sObserver)(nil)

// ListAndWatch notifies watcher with the current state and sends subsequent state changes.
// ListAndWatch sets the respective cache.SharedInformer event handlers to inform the
// provided observer.Notify listener of pod and node entity updates
func (k *k8sObserver) ListAndWatch(listener observer.Notify) {
k.informer.AddEventHandler(&handler{watcher: listener, idNamespace: k.config.ID().String()})
if k.podInformer != nil {
k.podInformer.AddEventHandler(&handler{listener: listener, idNamespace: k.config.ID().String(), logger: k.telemetry.Logger})
}
if k.nodeInformer != nil {
k.nodeInformer.AddEventHandler(&handler{listener: listener, idNamespace: k.config.ID().String(), logger: k.telemetry.Logger})
}
}

// newObserver creates a new k8s observer extension.
func newObserver(logger *zap.Logger, config *Config, listWatch cache.ListerWatcher) (component.Extension, error) {
informer := cache.NewSharedInformer(listWatch, &v1.Pod{}, 0)
return &k8sObserver{logger: logger, informer: informer, stop: make(chan struct{}), config: config}, nil
func newObserver(config *Config, telemetrySettings component.TelemetrySettings) (component.Extension, error) {
client, err := k8sconfig.MakeClient(config.APIConfig)
if err != nil {
return nil, err
}
restClient := client.CoreV1().RESTClient()

var podListerWatcher cache.ListerWatcher
if config.ObservePods {
var podSelector fields.Selector
if config.Node == "" {
podSelector = fields.Everything()
} else {
podSelector = fields.OneTermEqualSelector("spec.nodeName", config.Node)
}
telemetrySettings.Logger.Debug("observing pods")
podListerWatcher = cache.NewListWatchFromClient(restClient, "pods", v1.NamespaceAll, podSelector)
}

var nodeListerWatcher cache.ListerWatcher
if config.ObserveNodes {
var nodeSelector fields.Selector
if config.Node == "" {
nodeSelector = fields.Everything()
} else {
nodeSelector = fields.OneTermEqualSelector("metadata.name", config.Node)
}
telemetrySettings.Logger.Debug("observing nodes")
nodeListerWatcher = cache.NewListWatchFromClient(restClient, "nodes", v1.NamespaceAll, nodeSelector)
}

obs := &k8sObserver{
telemetry: telemetrySettings,
podListerWatcher: podListerWatcher,
nodeListerWatcher: nodeListerWatcher,
stop: make(chan struct{}),
config: config,
}

return obs, nil
}
Loading

0 comments on commit ebea5bc

Please sign in to comment.