Skip to content

Commit

Permalink
[receiver_creator] add receiver-specific resource attributes (#11766)
Browse files Browse the repository at this point in the history
and validate global resource_attributes endpoint type map keys
  • Loading branch information
rmfitzpatrick authored Aug 3, 2022
1 parent 4260dd6 commit 8505e94
Show file tree
Hide file tree
Showing 11 changed files with 218 additions and 24 deletions.
38 changes: 32 additions & 6 deletions receiver/receivercreator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,13 @@ config:
endpoint: '`endpoint`:8080'
```

**receivers.<receiver_type/id>.resource_attributes**
**receivers.resource_attributes**

```yaml
resource_attributes:
<endpoint type>:
<attribute>: <attribute value>
```
This setting controls what resource attributes are set on metrics emitted from the created receiver. These attributes can be set from [values in the endpoint](#rule-expressions) that was matched by the `rule`. These attributes vary based on the endpoint type. These defaults can be disabled by setting the attribute to be removed to an empty value. Note that the values can be dynamic and processed the same as in `config`.

Expand Down Expand Up @@ -98,6 +104,18 @@ None

See `redis/2` in [examples](#examples).


**receivers.&lt;receiver_type/id&gt;.resource_attributes**

```yaml
receivers:
<receiver_type>:
resource_attributes:
<attribute>: <attribute string value>
```

Similar to the per-endpoint type `resource_attributes` described above but for individual receiver instances. Duplicate attribute entries (including the empty string) in this receiver-specific mapping take precedence. These attribute values also support expansion from endpoint environment content. At this time their values must be strings.

## Rule Expressions

Each rule must start with `type == ("pod"|"port"|"hostport"|"container") &&` such that the rule matches
Expand Down Expand Up @@ -195,6 +213,10 @@ receivers:
config:
metrics_path: '`"prometheus.io/path" in annotations ? annotations["prometheus.io/path"] : "/metrics"`'
endpoint: '`endpoint`:`"prometheus.io/port" in annotations ? annotations["prometheus.io/port"] : 9090`'
resource_attributes:
an.attribute: a.value
# Dynamic configuration values
app.version: '`labels["app_version"]`'

redis/1:
# If this rule matches an instance of this receiver will be started.
Expand All @@ -210,18 +232,22 @@ receivers:
rule: type == "port" && port == 6379

resource_attributes:
# Dynamic configuration values
service.name: '`pod.labels["service_name"]`'
app: '`pod.labels["app"]`'
# Dynamic configuration values, overwriting default attributes`
pod:
service.name: '`labels["service_name"]`'
app: '`labels["app"]`'
port:
service.name: '`pod.labels["service_name"]`'
app: '`pod.labels["app"]`'
receiver_creator/2:
# Name of the extensions to watch for endpoints to start and stop.
watch_observers: [host_observer]
receivers:
redis/on_host:
# If this rule matches an instance of this receiver will be started.
rule: type == "port" && port == 6379 && is_ipv6 == true
resource_attributes:
service.name: redis_on_host
resource_attributes:
service.name: redis_on_host
receiver_creator/3:
watch_observers: [k8s_observer]
receivers:
Expand Down
19 changes: 18 additions & 1 deletion receiver/receivercreator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ type receiverTemplate struct {
// Rule is the discovery rule that when matched will create a receiver instance
// based on receiverTemplate.
Rule string `mapstructure:"rule"`
rule rule
// ResourceAttributes is a map of resource attributes to add to just this receiver's resource metrics.
// It can contain expr expressions for endpoint env value expansion
ResourceAttributes map[string]interface{} `mapstructure:"resource_attributes"`
rule rule
}

// resourceAttributes holds a map of default resource attributes for each Endpoint type.
Expand Down Expand Up @@ -97,6 +100,14 @@ func (cfg *Config) Unmarshal(componentParser *confmap.Conf) error {
return err
}

for endpointType := range cfg.ResourceAttributes {
switch endpointType {
case observer.ContainerType, observer.HostPortType, observer.K8sNodeType, observer.PodType, observer.PortType:
default:
return fmt.Errorf("resource attributes for unsupported endpoint type %q", endpointType)
}
}

receiversCfg, err := componentParser.Sub(receiversConfigKey)
if err != nil {
return fmt.Errorf("unable to extract key %v: %w", receiversConfigKey, err)
Expand All @@ -123,6 +134,12 @@ func (cfg *Config) Unmarshal(componentParser *confmap.Conf) error {
return fmt.Errorf("subreceiver %q rule is invalid: %w", subreceiverKey, err)
}

for k, v := range subreceiver.ResourceAttributes {
if _, ok := v.(string); !ok {
return fmt.Errorf("unsupported `resource_attributes` %q value %v in %s", k, v, subreceiverKey)
}
}

cfg.receiverTemplates[subreceiverKey] = subreceiver
}

Expand Down
26 changes: 26 additions & 0 deletions receiver/receivercreator/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,32 @@ func TestLoadConfig(t *testing.T) {
assert.Equal(t, []config.Type{"mock_observer"}, r1.WatchObservers)
}

func TestInvalidResourceAttributeEndpointType(t *testing.T) {
factories, err := componenttest.NopFactories()
require.Nil(t, err)

factories.Receivers[("nop")] = &nopWithEndpointFactory{ReceiverFactory: componenttest.NewNopReceiverFactory()}

factory := NewFactory()
factories.Receivers[typeStr] = factory
cfg, err := servicetest.LoadConfigAndValidate(filepath.Join("testdata", "invalid-resource-attributes.yaml"), factories)
require.EqualError(t, err, "error reading receivers configuration for \"receiver_creator\": resource attributes for unsupported endpoint type \"not.a.real.type\"")
require.Nil(t, cfg)
}

func TestInvalidReceiverResourceAttributeValueType(t *testing.T) {
factories, err := componenttest.NopFactories()
require.Nil(t, err)

factories.Receivers[("nop")] = &nopWithEndpointFactory{ReceiverFactory: componenttest.NewNopReceiverFactory()}

factory := NewFactory()
factories.Receivers[typeStr] = factory
cfg, err := servicetest.LoadConfigAndValidate(filepath.Join("testdata", "invalid-receiver-resource-attributes.yaml"), factories)
require.EqualError(t, err, "error reading receivers configuration for \"receiver_creator\": unsupported `resource_attributes` \"one\" value <nil> in examplereceiver/1")
require.Nil(t, cfg)
}

type nopWithEndpointConfig struct {
config.ReceiverSettings `mapstructure:",squash"`
Endpoint string `mapstructure:"endpoint"`
Expand Down
11 changes: 11 additions & 0 deletions receiver/receivercreator/observerhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,21 @@ func (obs *observerHandler) OnAdd(added []observer.Endpoint) {
continue
}

resAttrs := map[string]string{}
for k, v := range template.ResourceAttributes {
strVal, ok := v.(string)
if !ok {
obs.logger.Info(fmt.Sprintf("ignoring unsupported `resource_attributes` %q value %v", k, v))
continue
}
resAttrs[k] = strVal
}

// Adds default and/or configured resource attributes (e.g. k8s.pod.uid) to resources
// as telemetry is emitted.
resourceEnhancer, err := newResourceEnhancer(
obs.config.ResourceAttributes,
resAttrs,
env,
e,
obs.nextConsumer,
Expand Down
4 changes: 2 additions & 2 deletions receiver/receivercreator/observerhandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestOnAdd(t *testing.T) {
rcvrCfg := receiverConfig{id: config.NewComponentIDWithName("name", "1"), config: userConfigMap{"foo": "bar"}}
cfg := createDefaultConfig().(*Config)
cfg.receiverTemplates = map[string]receiverTemplate{
"name/1": {rcvrCfg, "", newRuleOrPanic(`type == "port"`)},
"name/1": {rcvrCfg, "", map[string]interface{}{}, newRuleOrPanic(`type == "port"`)},
}
handler := &observerHandler{
config: cfg,
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestOnChange(t *testing.T) {
newRcvr := &nopWithEndpointReceiver{}
cfg := createDefaultConfig().(*Config)
cfg.receiverTemplates = map[string]receiverTemplate{
"name/1": {rcvrCfg, "", newRuleOrPanic(`type == "port"`)},
"name/1": {rcvrCfg, "", map[string]interface{}{}, newRuleOrPanic(`type == "port"`)},
}
handler := &observerHandler{
config: cfg,
Expand Down
28 changes: 18 additions & 10 deletions receiver/receivercreator/resourceenhancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,31 @@ type resourceEnhancer struct {

func newResourceEnhancer(
resources resourceAttributes,
receiverAttributes map[string]string,
env observer.EndpointEnv,
endpoint observer.Endpoint,
nextConsumer consumer.Metrics,
) (*resourceEnhancer, error) {
attrs := map[string]string{}

// Precompute values that will be inserted for each resource object passed through.
for attr, expr := range resources[endpoint.Details.Type()] {
res, err := evalBackticksInConfigValue(expr, env)
if err != nil {
return nil, fmt.Errorf("failed processing resource attribute %q for endpoint %v: %w", attr, endpoint.ID, err)
}
for _, resource := range []map[string]string{resources[endpoint.Details.Type()], receiverAttributes} {
// Precompute values that will be inserted for each resource object passed through.
for attr, expr := range resource {
// If the attribute value is empty this signals to delete existing
if expr == "" {
delete(attrs, attr)
continue
}

res, err := evalBackticksInConfigValue(expr, env)
if err != nil {
return nil, fmt.Errorf("failed processing resource attribute %q for endpoint %v: %w", attr, endpoint.ID, err)
}

// If the attribute value is empty user has likely removed the default value so skip it.
val := fmt.Sprint(res)
if val != "" {
attrs[attr] = val
val := fmt.Sprint(res)
if val != "" {
attrs[attr] = val
}
}
}

Expand Down
44 changes: 39 additions & 5 deletions receiver/receivercreator/resourceenhancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,11 @@ func Test_newResourceEnhancer(t *testing.T) {

cfg := createDefaultConfig().(*Config)
type args struct {
resources resourceAttributes
env observer.EndpointEnv
endpoint observer.Endpoint
nextConsumer consumer.Metrics
resources resourceAttributes
resourceAttributes map[string]string
env observer.EndpointEnv
endpoint observer.Endpoint
nextConsumer consumer.Metrics
}
tests := []struct {
name string
Expand Down Expand Up @@ -131,6 +132,39 @@ func Test_newResourceEnhancer(t *testing.T) {
},
wantErr: false,
},
{
name: "both forms of resource attributes",
args: args{
resources: func() resourceAttributes {
res := map[observer.EndpointType]map[string]string{observer.PodType: {}}
for k, v := range cfg.ResourceAttributes[observer.PodType] {
res[observer.PodType][k] = v
}
res[observer.PodType]["duplicate.resource.attribute"] = "pod.value"
res[observer.PodType]["delete.me"] = "pod.value"
return res
}(),
resourceAttributes: map[string]string{
"expanded.resource.attribute": "`'labels' in pod ? pod.labels['region'] : labels['region']`",
"duplicate.resource.attribute": "receiver.value",
"delete.me": "",
},
env: podEnv,
endpoint: podEndpoint,
nextConsumer: nil,
},
want: &resourceEnhancer{
nextConsumer: nil,
attrs: map[string]string{
"k8s.namespace.name": "default",
"k8s.pod.name": "pod-1",
"k8s.pod.uid": "uid-1",
"duplicate.resource.attribute": "receiver.value",
"expanded.resource.attribute": "west-1",
},
},
wantErr: false,
},
{
name: "error",
args: args{
Expand All @@ -149,7 +183,7 @@ func Test_newResourceEnhancer(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := newResourceEnhancer(tt.args.resources, tt.args.env, tt.args.endpoint, tt.args.nextConsumer)
got, err := newResourceEnhancer(tt.args.resources, tt.args.resourceAttributes, tt.args.env, tt.args.endpoint, tt.args.nextConsumer)
if (err != nil) != tt.wantErr {
t.Errorf("newResourceEnhancer() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down
17 changes: 17 additions & 0 deletions receiver/receivercreator/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,27 @@ receivers:
receivers:
examplereceiver/1:
rule: type == "port"
config:
key: value
resource_attributes:
one: two
nop/1:
rule: type == "port"
config:
endpoint: localhost:12345
resource_attributes:
two: three
resource_attributes:
container:
container.key: container.value
pod:
pod.key: pod.value
port:
port.key: port.value
hostport:
hostport.key: hostport.value
k8s.node:
k8s.node.key: k8s.node.value

processors:
nop:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
receivers:
receiver_creator:
watch_observers: [mock_observer]
receivers:
examplereceiver/1:
rule: type == "port"
config:
key: value
resource_attributes:
one: null

processors:
nop:

exporters:
nop:

service:
pipelines:
metrics:
receivers: [receiver_creator]
processors: [nop]
exporters: [nop]
28 changes: 28 additions & 0 deletions receiver/receivercreator/testdata/invalid-resource-attributes.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
receivers:
receiver_creator:
watch_observers: [mock_observer]
receivers:
examplereceiver/1:
rule: type == "port"
config:
key: value
resource_attributes:
one: two
resource_attributes:
k8s.node:
k8s.node.key: k8s.node.value
not.a.real.type:
not: real

processors:
nop:

exporters:
nop:

service:
pipelines:
metrics:
receivers: [receiver_creator]
processors: [nop]
exporters: [nop]
4 changes: 4 additions & 0 deletions unreleased/receivercreatorresourceattributes.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
change_type: enhancement
component: receivercreator
note: add per-receiver `resource_attribute` and validate endpoint type keys on global
issues: [11766]

0 comments on commit 8505e94

Please sign in to comment.