diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index cf02390046d1..68a08139b7ad 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -134,6 +134,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Remove xpack enabled flag on ES, Logstash, Beats and Kibana {pull}24427[24427] - Adjust host fields to adopt new names from 1.9.0 ECS. {pull}24312[24312] - Add replicas.ready field to state_statefulset in Kubernetes module{pull}26088[26088] +- Added `statsd.mappings` configuration for Statsd module {pull}26220[26220] +- Added Airflow lightweight module {pull}26220[26220] - Add state_job metricset to Kubernetes module{pull}26479[26479] *Packetbeat* diff --git a/metricbeat/docs/fields.asciidoc b/metricbeat/docs/fields.asciidoc index 6e2fdbc16738..20c0a493a6fc 100644 --- a/metricbeat/docs/fields.asciidoc +++ b/metricbeat/docs/fields.asciidoc @@ -14,6 +14,7 @@ grouped in the following categories: * <> * <> +* <> * <> * <> * <> @@ -843,6 +844,234 @@ type: boolean -- +[[exported-fields-airflow]] +== Airflow fields + +Airflow module + + + + +*`airflow.*.1m_rate`*:: ++ +-- +Airflow 1m rate timers metric + + +type: object + +-- + +*`airflow.*.5m_rate`*:: ++ +-- +Airflow 5m rate timers metric + + +type: object + +-- + +*`airflow.*.15m_rate`*:: ++ +-- +Airflow 15 rate timers metric + + +type: object + +-- + +*`airflow.*.count`*:: ++ +-- +Airflow counters + + +type: object + +-- + +*`airflow.*.max`*:: ++ +-- +Airflow max timers metric + + +type: object + +-- + +*`airflow.*.mean_rate`*:: ++ +-- +Airflow mean rate timers metric + + +type: object + +-- + +*`airflow.*.mean`*:: ++ +-- +Airflow mean timers metric + + +type: object + +-- + +*`airflow.*.median`*:: ++ +-- +Airflow median timers metric + + +type: object + +-- + +*`airflow.*.min`*:: ++ +-- +Airflow min timers metric + + +type: object + +-- + +*`airflow.*.p75`*:: ++ +-- +Airflow 75 percentile timers metric + + +type: object + +-- + +*`airflow.*.p95`*:: ++ +-- +Airflow 95 percentile timers metric + + +type: object + +-- + +*`airflow.*.p99_9`*:: ++ +-- +Airflow 99.9 percentile timers metric + + +type: object + +-- + +*`airflow.*.p99`*:: ++ +-- +Airflow 99 percentile timers metric + + +type: object + +-- + +*`airflow.*.stddev`*:: ++ +-- +Airflow standard deviation timers metric + + +type: object + +-- + +*`airflow.*.value`*:: ++ +-- +Airflow gauges + + +type: object + +-- + +*`airflow.dag_file`*:: ++ +-- +Airflow dag file metadata + + +type: keyword + +-- + +*`airflow.dag_id`*:: ++ +-- +Airflow dag id metadata + + +type: keyword + +-- + +*`airflow.job_name`*:: ++ +-- +Airflow job name metadata + + +type: keyword + +-- + +*`airflow.operator_name`*:: ++ +-- +Airflow operator name metadata + + +type: keyword + +-- + +*`airflow.pool_name`*:: ++ +-- +Airflow pool name metadata + + +type: keyword + +-- + +*`airflow.status`*:: ++ +-- +Airflow status metadata + + +type: keyword + +-- + +*`airflow.task_id`*:: ++ +-- +Airflow task id metadata + + +type: keyword + +-- + [[exported-fields-apache]] == Apache fields diff --git a/metricbeat/docs/modules/airflow.asciidoc b/metricbeat/docs/modules/airflow.asciidoc new file mode 100644 index 000000000000..0967cee2b598 --- /dev/null +++ b/metricbeat/docs/modules/airflow.asciidoc @@ -0,0 +1,60 @@ +//// +This file is generated! See scripts/mage/docs_collector.go +//// + +[[metricbeat-module-airflow]] +[role="xpack"] +== Airflow module + +beta[] + +This module collects metrics from +https://airflow.apache.org/docs/apache-airflow/stable/logging-monitoring/metrics.html[Airflow metrics] running a +statsd server where airflow will send metrics to. The default metricset is `statsd`. + +[float] +=== Compatibility + +The Airflow module is tested with Airflow 2.1.0. It should work with version +2.0.0 and later. + +[float] +=== Usage +The Airflow module requires <> to receive Statsd metrics. Refer to the link for instructions about how to use Statsd. + +Add the following lines to your Airflow configuration file e.g. `airflow.cfg` ensuring `statsd_prefix` is left empty and replace `%METRICBEAT_HOST%` with the address metricbeat is running: + +``` +[metrics] +statsd_on = True +statsd_host = %METRICBEAT_HOST% +statsd_port = 8126 +statsd_prefix = +``` + + +[float] +=== Example configuration + +The Airflow module supports the standard configuration options that are described +in <>. Here is an example configuration: + +[source,yaml] +---- +metricbeat.modules: +- module: airflow + host: "localhost" + port: "8126" + #ttl: "30s" + metricsets: [ 'statsd' ] +---- + +[float] +=== Metricsets + +The following metricsets are available: + +* <> + +include::airflow/statsd.asciidoc[] + diff --git a/metricbeat/docs/modules/airflow/statsd.asciidoc b/metricbeat/docs/modules/airflow/statsd.asciidoc new file mode 100644 index 000000000000..539ff092d013 --- /dev/null +++ b/metricbeat/docs/modules/airflow/statsd.asciidoc @@ -0,0 +1,25 @@ +//// +This file is generated! See scripts/mage/docs_collector.go +//// + +[[metricbeat-metricset-airflow-statsd]] +[role="xpack"] +=== Airflow statsd metricset + +beta[] + +include::../../../../x-pack/metricbeat/module/airflow/statsd/_meta/docs.asciidoc[] + +This is a default metricset. If the host module is unconfigured, this metricset is enabled by default. + +==== Fields + +For a description of each field in the metricset, see the +<> section. + +Here is an example document generated by this metricset: + +[source,json] +---- +include::../../../../x-pack/metricbeat/module/airflow/statsd/_meta/data.json[] +---- diff --git a/metricbeat/docs/modules/statsd.asciidoc b/metricbeat/docs/modules/statsd.asciidoc index 491d1e718773..1c7dc8de8fe7 100644 --- a/metricbeat/docs/modules/statsd.asciidoc +++ b/metricbeat/docs/modules/statsd.asciidoc @@ -33,7 +33,28 @@ The `statsd` module has these additional config options: Irrespective of the given ttl, metrics will be reported at least once. A ttl of zero means metrics will never expire. -[float] +*`statsd.mapping`*:: It defines how metrics will mapped from the original metric label to the event json. +Here's an example configuration: +[source,yaml] +---- +statsd.mappings: + - metric: 'ti_failures' <1> + value: + field: task_failures <2> + - metric: '_start' <1> + labels: + - attr: job_name <3> + field: job_name <4> + value: + field: started <2> +---- + +<1> `metric`, required: the label key of the metric in statsd, either as a exact match string +or as a template with named label placeholder in the format `` +<2> `value.field`, required: field name where to save the metric value in the event json +<3> `label[].attr`, required when using named label placeholder: reference to the named label placeholder defined in `metric` +<4> `label[].field`, required when using named label placeholder field name where to save the named label placeholder value from the template in the event json + === Metricsets Currently, there is only `server` metricset in `statsd` module. diff --git a/metricbeat/docs/modules_list.asciidoc b/metricbeat/docs/modules_list.asciidoc index dc89e7ea8b02..3e2e0b7f5bfa 100644 --- a/metricbeat/docs/modules_list.asciidoc +++ b/metricbeat/docs/modules_list.asciidoc @@ -11,6 +11,8 @@ This file is generated! See scripts/mage/docs_collector.go |<> |<> |image:./images/icon-yes.png[Prebuilt dashboards are available] | .1+| .1+| |<> +|<> beta[] |image:./images/icon-no.png[No prebuilt dashboards] | +.1+| .1+| |<> beta[] |<> |image:./images/icon-yes.png[Prebuilt dashboards are available] | .1+| .1+| |<> |<> beta[] |image:./images/icon-no.png[No prebuilt dashboards] | @@ -306,6 +308,7 @@ This file is generated! See scripts/mage/docs_collector.go include::modules/activemq.asciidoc[] include::modules/aerospike.asciidoc[] +include::modules/airflow.asciidoc[] include::modules/apache.asciidoc[] include::modules/appsearch.asciidoc[] include::modules/aws.asciidoc[] diff --git a/x-pack/metricbeat/include/list.go b/x-pack/metricbeat/include/list.go index 954c6ba827bc..6fdca95061ce 100644 --- a/x-pack/metricbeat/include/list.go +++ b/x-pack/metricbeat/include/list.go @@ -9,6 +9,7 @@ package include import ( // Import packages that need to register themselves. _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/activemq" + _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/airflow" _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/appsearch" _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/appsearch/stats" _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws" diff --git a/x-pack/metricbeat/metricbeat.reference.yml b/x-pack/metricbeat/metricbeat.reference.yml index 718d723e58ea..1b5fc6ab8b94 100644 --- a/x-pack/metricbeat/metricbeat.reference.yml +++ b/x-pack/metricbeat/metricbeat.reference.yml @@ -158,6 +158,13 @@ metricbeat.modules: period: 10s hosts: ["localhost:3000"] +#------------------------------- Airflow Module ------------------------------- +- module: airflow + host: "localhost" + port: "8126" + #ttl: "30s" + metricsets: [ 'statsd' ] + #-------------------------------- Apache Module -------------------------------- - module: apache metricsets: ["status"] diff --git a/x-pack/metricbeat/module/airflow/_meta/README.md b/x-pack/metricbeat/module/airflow/_meta/README.md new file mode 100644 index 000000000000..95824e033c3e --- /dev/null +++ b/x-pack/metricbeat/module/airflow/_meta/README.md @@ -0,0 +1,29 @@ +# Airflow metricbeat module + +## How to test +You don't need a running Airflow instance in order to test the module: +sending a `statsd` metric (https://github.com/statsd/statsd/blob/master/docs/metric_types.md) to the `statsd` listener started by the module is enough: + +``` +$ ./metricbeat modules enable airflow +$ ./metricbeat -e # (Start metricbeat according to your preferred setup) +$ echo "dagrun.duration.failed.dagid:200|ms" > /dev/udp/127.0.0.1/8126 # (with any of the metrics that can be found at https://airflow.apache.org/docs/apache-airflow/stable/logging-monitoring/metrics.html) +``` + +### Testing with Airflow +You need to install `statsd` module in Airflow and let it point to host and port defined in the module: +https://airflow.apache.org/docs/apache-airflow/2.0.0/logging-monitoring/metrics.html + +``` +$ pip install 'apache-airflow[statsd]' +``` + +Add the following lines to your configuration file e.g. `airflow.cfg` ensuring `statsd_prefix` is left empty: + +``` +[metrics] +statsd_on = True +statsd_host = localhost +statsd_port = 8126 +statsd_prefix = +``` diff --git a/x-pack/metricbeat/module/airflow/_meta/config.yml b/x-pack/metricbeat/module/airflow/_meta/config.yml new file mode 100644 index 000000000000..25ad81516a24 --- /dev/null +++ b/x-pack/metricbeat/module/airflow/_meta/config.yml @@ -0,0 +1,5 @@ +- module: airflow + host: "localhost" + port: "8126" + #ttl: "30s" + metricsets: [ 'statsd' ] diff --git a/x-pack/metricbeat/module/airflow/_meta/docs.asciidoc b/x-pack/metricbeat/module/airflow/_meta/docs.asciidoc new file mode 100644 index 000000000000..16f45db7454d --- /dev/null +++ b/x-pack/metricbeat/module/airflow/_meta/docs.asciidoc @@ -0,0 +1,23 @@ +This module collects metrics from +https://airflow.apache.org/docs/apache-airflow/stable/logging-monitoring/metrics.html[Airflow metrics] running a +statsd server where airflow will send metrics to. The default metricset is `statsd`. + +[float] +=== Compatibility + +The Airflow module is tested with Airflow 2.1.0. It should work with version +2.0.0 and later. + +[float] +=== Usage +The Airflow module requires <> to receive Statsd metrics. Refer to the link for instructions about how to use Statsd. + +Add the following lines to your Airflow configuration file e.g. `airflow.cfg` ensuring `statsd_prefix` is left empty and replace `%METRICBEAT_HOST%` with the address metricbeat is running: + +``` +[metrics] +statsd_on = True +statsd_host = %METRICBEAT_HOST% +statsd_port = 8126 +statsd_prefix = +``` diff --git a/x-pack/metricbeat/module/airflow/_meta/fields.yml b/x-pack/metricbeat/module/airflow/_meta/fields.yml new file mode 100644 index 000000000000..1a81133970ab --- /dev/null +++ b/x-pack/metricbeat/module/airflow/_meta/fields.yml @@ -0,0 +1,127 @@ +- key: airflow + title: "Airflow" + description: > + Airflow module + release: beta + fields: + - name: airflow + type: group + fields: + - name: '*.1m_rate' + type: object + object_type: double + object_type_mapping_type: "*" + description: > + Airflow 1m rate timers metric + - name: '*.5m_rate' + type: object + object_type: double + object_type_mapping_type: "*" + description: > + Airflow 5m rate timers metric + - name: '*.15m_rate' + type: object + object_type: double + object_type_mapping_type: "*" + description: > + Airflow 15 rate timers metric + - name: '*.count' + type: object + object_type: double + object_type_mapping_type: "*" + description: > + Airflow counters + - name: '*.max' + type: object + object_type: double + object_type_mapping_type: "*" + description: > + Airflow max timers metric + - name: '*.mean_rate' + type: object + object_type: double + object_type_mapping_type: "*" + description: > + Airflow mean rate timers metric + - name: '*.mean' + type: object + object_type: double + object_type_mapping_type: "*" + description: > + Airflow mean timers metric + - name: '*.median' + type: object + object_type: double + object_type_mapping_type: "*" + description: > + Airflow median timers metric + - name: '*.min' + type: object + object_type: double + object_type_mapping_type: "*" + description: > + Airflow min timers metric + - name: '*.p75' + type: object + object_type: double + object_type_mapping_type: "*" + description: > + Airflow 75 percentile timers metric + - name: '*.p95' + type: object + object_type: double + object_type_mapping_type: "*" + description: > + Airflow 95 percentile timers metric + - name: '*.p99_9' + type: object + object_type: double + object_type_mapping_type: "*" + description: > + Airflow 99.9 percentile timers metric + - name: '*.p99' + type: object + object_type: double + object_type_mapping_type: "*" + description: > + Airflow 99 percentile timers metric + - name: '*.stddev' + type: object + object_type: double + object_type_mapping_type: "*" + description: > + Airflow standard deviation timers metric + - name: '*.value' + type: object + object_type: double + object_type_mapping_type: "*" + description: > + Airflow gauges + - name: 'dag_file' + type: keyword + description: > + Airflow dag file metadata + - name: 'dag_id' + type: keyword + description: > + Airflow dag id metadata + - name: 'job_name' + type: keyword + description: > + Airflow job name metadata + - name: 'operator_name' + type: keyword + description: > + Airflow operator name metadata + - name: 'pool_name' + type: keyword + description: > + Airflow pool name metadata + - name: 'status' + type: keyword + description: > + Airflow status metadata + - name: 'task_id' + type: keyword + description: > + Airflow task id metadata diff --git a/x-pack/metricbeat/module/airflow/fields.go b/x-pack/metricbeat/module/airflow/fields.go new file mode 100644 index 000000000000..117076798529 --- /dev/null +++ b/x-pack/metricbeat/module/airflow/fields.go @@ -0,0 +1,23 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// Code generated by beats/dev-tools/cmd/asset/asset.go - DO NOT EDIT. + +package airflow + +import ( + "github.com/elastic/beats/v7/libbeat/asset" +) + +func init() { + if err := asset.SetFields("metricbeat", "airflow", asset.ModuleFieldsPri, AssetAirflow); err != nil { + panic(err) + } +} + +// AssetAirflow returns asset data. +// This is the base64 encoded gzipped contents of module/airflow. +func AssetAirflow() string { + return "eJzMl8GK4zAMQO/5CtHLwMAM9BCG5LCwXxKUSA1q7djYSmf694vTdslCtk0hB+dUJFnvKQjqfMCJLzWghINx3wWAihquYff7GtkVAMSxC+JV3FDDrwIA4JYF62g0XAAENoyRa2hZsQA4CBuK9VT8AQNankPSoxfPNfTBjf4WmZ+Zn3t7/9zbJqDy29/c/bxrj9zpLHwNNNcsubGd9BayjUXvZehvpbv33axuYeT7cx99byEpgYrlEMGyBumW3Mv83Mu17vsM5fflSvnOjYPmZD4JcYhLshZ/clK1+PP8BVvGIbv1SFIrFySVZqe+wpokN+9ktMJc8tKWFc7+q8zJ+asEz6HjQcWs2HBfZWVfvWpfNVVW/tVn9eoEefm/ZB+ViM85DRAVB8JAQHwWTJXPpzijGbP6h+px7HnhEkDYNwcxC64nvny7QK9hCHtI7dKrQcLpWr5AFNqQJ/SAdnRtk35uxDu6dur8gOg8B1QXtsTeez5je+fMltzU7xkzKuoYNwJemz2gKcbTduuTuv1nf/79uPwTAAD//9XSEKo=" +} diff --git a/x-pack/metricbeat/module/airflow/module.yml b/x-pack/metricbeat/module/airflow/module.yml new file mode 100644 index 000000000000..f5d5951b9ab7 --- /dev/null +++ b/x-pack/metricbeat/module/airflow/module.yml @@ -0,0 +1,3 @@ +name: airflow +metricsets: +- statsd diff --git a/x-pack/metricbeat/module/airflow/statsd/_meta/data.json b/x-pack/metricbeat/module/airflow/statsd/_meta/data.json new file mode 100644 index 000000000000..4e5c41437cb9 --- /dev/null +++ b/x-pack/metricbeat/module/airflow/statsd/_meta/data.json @@ -0,0 +1,34 @@ +{ + "@timestamp": "2017-10-12T08:05:34.853Z", + "airflow": { + "dag_duration": { + "15m_rate": 0.2, + "1m_rate": 0.2, + "5m_rate": 0.2, + "count": 1, + "max": 200, + "mean": 200, + "mean_rate": 0.2222490946071946, + "median": 200, + "min": 200, + "p75": 200, + "p95": 200, + "p99": 200, + "p99_9": 200, + "stddev": 0 + }, + "dag_id": "a_dagid", + "status": "failure" + }, + "event": { + "dataset": "statsd", + "module": "airflow" + }, + "labels": { + "k1": "v1", + "k2": "v2" + }, + "service": { + "type": "airflow" + } +} \ No newline at end of file diff --git a/x-pack/metricbeat/module/airflow/statsd/_meta/docs.asciidoc b/x-pack/metricbeat/module/airflow/statsd/_meta/docs.asciidoc new file mode 100644 index 000000000000..529afc6d8e2f --- /dev/null +++ b/x-pack/metricbeat/module/airflow/statsd/_meta/docs.asciidoc @@ -0,0 +1 @@ +Statsd Metricset retrieves the Airflow metrics using Statsd. diff --git a/x-pack/metricbeat/module/airflow/statsd/_meta/fields.yml b/x-pack/metricbeat/module/airflow/statsd/_meta/fields.yml new file mode 100644 index 000000000000..8033a27f5ac5 --- /dev/null +++ b/x-pack/metricbeat/module/airflow/statsd/_meta/fields.yml @@ -0,0 +1 @@ +- release: beta diff --git a/x-pack/metricbeat/module/airflow/statsd/data_test.go b/x-pack/metricbeat/module/airflow/statsd/data_test.go new file mode 100644 index 000000000000..5c5dc2387299 --- /dev/null +++ b/x-pack/metricbeat/module/airflow/statsd/data_test.go @@ -0,0 +1,75 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package statsd + +import ( + "fmt" + "net" + "runtime" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/auditbeat/core" + _ "github.com/elastic/beats/v7/libbeat/processors/actions" + "github.com/elastic/beats/v7/metricbeat/mb" + mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" + _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/statsd/server" +) + +func init() { + mb.Registry.SetSecondarySource(mb.NewLightModulesSource("../../../module")) +} + +const ( + STATSD_HOST = "localhost" + STATSD_PORT = 8126 +) + +func getConfig() map[string]interface{} { + return map[string]interface{}{ + "module": "airflow", + "metricsets": []string{"statsd"}, + "host": STATSD_HOST, + "port": STATSD_PORT, + "period": "100ms", + } +} + +func createEvent(t *testing.T) { + udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", STATSD_HOST, STATSD_PORT)) + require.NoError(t, err) + + conn, err := net.DialUDP("udp", nil, udpAddr) + require.NoError(t, err) + + _, err = fmt.Fprint(conn, "dagrun.duration.failed.a_dagid:200|ms|#k1:v1,k2:v2") + require.NoError(t, err) +} + +func TestData(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("Skipping `data.json` generation test") + } + + ms := mbtest.NewPushMetricSetV2(t, getConfig()) + var events []mb.Event + done := make(chan interface{}) + go func() { + events = mbtest.RunPushMetricSetV2(5*time.Second, 1, ms) + close(done) + }() + + createEvent(t) + <-done + + if len(events) == 0 { + t.Fatal("received no events") + } + + beatEvent := mbtest.StandardizeEvent(ms, events[0], core.AddDatasetToEvent) + mbtest.WriteEventToDataJSON(t, beatEvent, "") +} diff --git a/x-pack/metricbeat/module/airflow/statsd/manifest.yml b/x-pack/metricbeat/module/airflow/statsd/manifest.yml new file mode 100644 index 000000000000..ccb718f233b7 --- /dev/null +++ b/x-pack/metricbeat/module/airflow/statsd/manifest.yml @@ -0,0 +1,276 @@ +default: true +input: + module: statsd + metricset: server + defaults: + namespace: "airflow" + statsd.mappings: + - metric: '_start' + labels: + - attr: job_name + field: job_name + value: + field: started + - metric: '_end' + labels: + - attr: job_name + field: job_name + value: + field: ended + - metric: _heartbeat_failure + labels: + - attr: job_name + field: job_name + value: + field: heartbeat_failure + - metric: 'operator_failures_' + labels: + - attr: operator_name + field: operator_name + value: + field: failures + - metric: 'operator_successes_' + labels: + - attr: operator_name + field: operator_name + value: + field: successes + - metric: 'ti_failures' + value: + field: task_failures + - metric: 'ti_successes' + value: + field: task_successes + - metric: 'previously_succeeded' + value: + field: previously_succeeded + - metric: 'zombies_killed' + value: + field: zombies_killed + - metric: 'scheduler_heartbeat' + value: + field: scheduler_heartbeat + - metric: 'dag_processing.manager_stalls' + value: + field: dag_file_processor_manager_stalls + - metric: 'dag_file_refresh_error' + value: + field: dag_file_refresh_error + - metric: 'dag_processing.processes' + value: + field: dag_processes + - metric: 'scheduler.tasks.killed_externally' + value: + field: task_killed_externally + - metric: 'scheduler.tasks.running' + value: + field: task_running + - metric: 'scheduler.tasks.starving' + value: + field: task_starving + - metric: 'scheduler.orphaned_tasks.cleared' + value: + field: task_orphaned_cleared + - metric: 'scheduler.orphaned_tasks.adopted' + value: + field: task_orphaned_adopted + - metric: 'scheduler.critical_section_busy' + value: + field: scheduler_critical_section_busy + - metric: 'sla_email_notification_failure' + value: + field: sla_email_notification_failure + - metric: 'ti.start..' + labels: + - attr: dagid + field: dag_id + - attr: taskid + field: task_id + value: + field: task_started + - metric: 'ti.finish...' + labels: + - attr: dagid + field: dag_id + - attr: taskid + field: task_id + - attr: status + field: status + value: + field: task_finished + - metric: 'dag.callback_exceptions' + value: + field: dag_callback_exceptions + - metric: 'celery.task_timeout_error' + value: + field: task_celery_timeout_error + - metric: 'task_removed_from_dag.' + labels: + - attr: dagid + field: dag_id + value: + field: task_removed + - metric: 'task_restored_to_dag.' + labels: + - attr: dagid + field: dag_id + value: + field: task_restored + - metric: 'task_instance_created-' + labels: + - attr: operator_name + field: operator_name + value: + field: task_created + - metric: 'dagbag_size' + value: + field: dag_bag_size + - metric: 'dag_processing.import_errors' + value: + field: dag_import_errors + - metric: 'dag_processing.total_parse_time' + value: + field: dag_total_parse_time + - metric: 'dag_processing.last_runtime.' + labels: + - attr: dag_file + field: dag_file + value: + field: dag_last_runtime + - metric: 'dag_processing.last_run.seconds_ago.' + labels: + - attr: dag_file + field: dag_file + value: + field: dag_last_run_seconds_ago + - metric: 'dag_processing.processor_timeouts' + value: + field: processor_timeouts + - metric: 'scheduler.tasks.without_dagrun' + value: + field: task_without_dagrun + - metric: 'scheduler.tasks.running' + value: + field: task_running + - metric: 'scheduler.tasks.starving' + value: + field: task_starving + - metric: 'scheduler.tasks.executable' + value: + field: task_executable + - metric: 'executor.open_slots' + value: + field: executor_open_slots + - metric: 'executor.queued_tasks' + value: + field: executor_queued_tasks + - metric: 'executor.running_tasks' + value: + field: executor_running_tasks + - metric: 'pool.open_slots.' + labels: + - attr: pool_name + field: pool_name + value: + field: pool_open_slots + - metric: 'pool.queued_slots.' + labels: + - attr: pool_name + field: pool_name + value: + field: pool_queued_slots + - metric: 'pool.running_slots.' + labels: + - attr: pool_name + field: pool_name + value: + field: pool_running_slots + - metric: 'pool.starving_tasks.' + labels: + - attr: pool_name + field: pool_name + value: + field: pool_starving_tasks + - metric: 'smart_sensor_operator.poked_tasks' + value: + field: smart_sensor_operator_poked_tasks + - metric: 'smart_sensor_operator.poked_success' + value: + field: smart_sensor_operator_poked_success + - metric: 'smart_sensor_operator.poked_exception' + value: + field: smart_sensor_operator_poked_exception + - metric: 'smart_sensor_operator.exception_failures' + value: + field: smart_sensor_operator_exception_failures + - metric: 'smart_sensor_operator.infra_failures' + value: + field: smart_sensor_operator_infra_failures + - metric: 'dagrun.dependency-check.' + labels: + - attr: dag_id + field: dag_id + value: + field: dag_dependency_check + - metric: 'dag...duration' + labels: + - attr: dag_id + field: dag_id + - attr: task_id + field: task_id + value: + field: task_duration + - metric: 'dag_processing.last_duration.' + labels: + - attr: dag_file + field: dag_file + value: + field: dag_last_duration + - metric: 'dagrun.duration.success.' + labels: + - attr: dag_id + field: dag_id + value: + field: success_dag_duration + - metric: 'dagrun.duration.failed.' + labels: + - attr: dag_id + field: dag_id + value: + field: failed_dag_duration + - metric: 'dagrun.schedule_delay.' + labels: + - attr: dag_id + field: dag_id + value: + field: dag_schedule_delay + - metric: 'scheduler.critical_section_duration' + value: + field: scheduler_critical_section_duration + - metric: 'dagrun..first_task_scheduling_delay' + labels: + - attr: dag_id + field: dag_id + value: + field: dag_first_task_scheduling_delay +processors: + - add_fields: + when: + has_fields: ["airflow.failed_dag_duration"] + target: '' + fields: + airflow.status: failure + - add_fields: + when: + has_fields: ["airflow.success_dag_duration"] + target: '' + fields: + airflow.status: success + - rename: + fields: + - from: airflow.failed_dag_duration + to: airflow.dag_duration + - from: airflow.success_dag_duration + to: airflow.dag_duration + ignore_missing: true + fail_on_error: false diff --git a/x-pack/metricbeat/module/airflow/test_airflow.py b/x-pack/metricbeat/module/airflow/test_airflow.py new file mode 100644 index 000000000000..fa01115f83ce --- /dev/null +++ b/x-pack/metricbeat/module/airflow/test_airflow.py @@ -0,0 +1,63 @@ +import os +import socket +import sys +from xpack_metricbeat import XPackTest, metricbeat + +STATSD_HOST = '127.0.0.1' +STATSD_PORT = 8126 + +METRIC_MESSAGE = bytes('dagrun.duration.failed.a_dagid:200|ms|#k1:v1,k2:v2', 'utf-8') + + +class Test(XPackTest): + + def test_server(self): + """ + airflow statsd metricset test + """ + + # Start the application + self.render_config_template(modules=[{ + "name": "airflow", + "metricsets": ["statsd"], + "period": "5s", + "host": STATSD_HOST, + "port": STATSD_PORT, + }]) + proc = self.start_beat(home=self.beat_path) + self.wait_until(lambda: self.log_contains("Started listening for UDP")) + + # Send UDP packet with metric + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.sendto(METRIC_MESSAGE, (STATSD_HOST, STATSD_PORT)) + sock.close() + + self.wait_until(lambda: self.output_lines() > 0) + proc.check_kill_and_wait() + self.assert_no_logged_warnings(replace='use of closed network connection') + + # Verify output + output = self.read_output_json() + self.assertGreater(len(output), 0) + evt = output[0] + + del evt["airflow"]["dag_duration"]["mean_rate"] # floating + del evt["airflow"]["dag_duration"]["1m_rate"] # floating + del evt["airflow"]["dag_duration"]["5m_rate"] # floating + del evt["airflow"]["dag_duration"]["15m_rate"] # floating + + assert evt["airflow"]["dag_id"] == "a_dagid" + assert evt["airflow"]["status"] == "failure" + assert evt["airflow"]["dag_duration"] == { + "p99_9": 200, + "count": 1, + "median": 200, + "p99": 200, + "p95": 200, + "min": 200, + "stddev": 0, + "p75": 200, + "max": 200, + "mean": 200, + } + self.assert_fields_are_documented(evt) diff --git a/x-pack/metricbeat/module/statsd/_meta/docs.asciidoc b/x-pack/metricbeat/module/statsd/_meta/docs.asciidoc index 22a85f0d0fd8..34ccee20d3fc 100644 --- a/x-pack/metricbeat/module/statsd/_meta/docs.asciidoc +++ b/x-pack/metricbeat/module/statsd/_meta/docs.asciidoc @@ -25,7 +25,28 @@ The `statsd` module has these additional config options: Irrespective of the given ttl, metrics will be reported at least once. A ttl of zero means metrics will never expire. -[float] +*`statsd.mapping`*:: It defines how metrics will mapped from the original metric label to the event json. +Here's an example configuration: +[source,yaml] +---- +statsd.mappings: + - metric: 'ti_failures' <1> + value: + field: task_failures <2> + - metric: '_start' <1> + labels: + - attr: job_name <3> + field: job_name <4> + value: + field: started <2> +---- + +<1> `metric`, required: the label key of the metric in statsd, either as a exact match string +or as a template with named label placeholder in the format `` +<2> `value.field`, required: field name where to save the metric value in the event json +<3> `label[].attr`, required when using named label placeholder: reference to the named label placeholder defined in `metric` +<4> `label[].field`, required when using named label placeholder field name where to save the named label placeholder value from the template in the event json + === Metricsets Currently, there is only `server` metricset in `statsd` module. diff --git a/x-pack/metricbeat/module/statsd/server/data.go b/x-pack/metricbeat/module/statsd/server/data.go index 9c8613e0c17b..9935d4db6e50 100644 --- a/x-pack/metricbeat/module/statsd/server/data.go +++ b/x-pack/metricbeat/module/statsd/server/data.go @@ -11,6 +11,7 @@ import ( "github.com/pkg/errors" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/metricbeat/helper/server" ) @@ -99,6 +100,48 @@ func parse(b []byte) ([]statsdMetric, error) { return metrics, nil } +func eventMapping(metricName string, metricValue interface{}, metricSetFields common.MapStr, mappings map[string]StatsdMapping) { + if len(mappings) == 0 { + metricSetFields[common.DeDot(metricName)] = metricValue + return + } + + for _, mapping := range mappings { + // The metricname match the one with no labels in mappings + // Let's insert it dedotted and continue + if metricName == mapping.Metric { + metricSetFields[mapping.Value.Field] = metricValue + return + } + + res := mapping.regex.FindStringSubmatch(metricName) + + // Not all labels match + // Skip and continue to next mapping + if len(res) != (len(mapping.Labels) + 1) { + logger.Debugf("not all labels match in statsd.mapping, skipped") + continue + } + + // Let's add the metric set fields from labels + names := mapping.regex.SubexpNames() + for i, _ := range res { + for _, label := range mapping.Labels { + if label.Attr != names[i] { + continue + } + + metricSetFields[label.Field] = res[i] + } + } + + // Let's add the metric with the value field + metricSetFields[mapping.Value.Field] = metricValue + } + + return +} + func newMetricProcessor(ttl time.Duration) *metricProcessor { return &metricProcessor{ registry: ®istry{metrics: map[string]map[string]*metric{}, ttl: ttl}, diff --git a/x-pack/metricbeat/module/statsd/server/data_test.go b/x-pack/metricbeat/module/statsd/server/data_test.go index eeb062cf00ad..2b5d19dd3a2b 100644 --- a/x-pack/metricbeat/module/statsd/server/data_test.go +++ b/x-pack/metricbeat/module/statsd/server/data_test.go @@ -5,17 +5,852 @@ package server import ( + "fmt" "testing" "time" + "gopkg.in/yaml.v2" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/metricbeat/helper/server" + "github.com/elastic/beats/v7/metricbeat/mb" mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" ) +func init() { + mb.Registry.SetSecondarySource(mb.NewLightModulesSource("../../../module")) +} + +func TestEventMapping(t *testing.T) { + mappingsYml := ` + - metric: '_start' + labels: + - attr: job_name + field: job_name + value: + field: started + - metric: '_end' + labels: + - attr: job_name + field: job_name + value: + field: ended + - metric: _heartbeat_failure + labels: + - attr: job_name + field: job_name + value: + field: heartbeat_failure + - metric: 'operator_failures_' + labels: + - attr: operator_name + field: operator_name + value: + field: failures + - metric: 'operator_successes_' + labels: + - attr: operator_name + field: operator_name + value: + field: successes + - metric: 'ti_failures' + value: + field: task_failures + - metric: 'ti_successes' + value: + field: task_successes + - metric: 'previously_succeeded' + value: + field: previously_succeeded + - metric: 'zombies_killed' + value: + field: zombies_killed + - metric: 'scheduler_heartbeat' + value: + field: scheduler_heartbeat + - metric: 'dag_processing.manager_stalls' + value: + field: dag_file_processor_manager_stalls + - metric: 'dag_file_refresh_error' + value: + field: dag_file_refresh_error + - metric: 'dag_processing.processes' + value: + field: dag_processes + - metric: 'scheduler.tasks.killed_externally' + value: + field: task_killed_externally + - metric: 'scheduler.tasks.running' + value: + field: task_running + - metric: 'scheduler.tasks.starving' + value: + field: task_starving + - metric: 'scheduler.orphaned_tasks.cleared' + value: + field: task_orphaned_cleared + - metric: 'scheduler.orphaned_tasks.adopted' + value: + field: task_orphaned_adopted + - metric: 'scheduler.critical_section_busy' + value: + field: scheduler_critical_section_busy + - metric: 'sla_email_notification_failure' + value: + field: sla_email_notification_failure + - metric: 'ti.start..' + labels: + - attr: dagid + field: dag_id + - attr: taskid + field: task_id + value: + field: task_started + - metric: 'ti.finish...' + labels: + - attr: dagid + field: dag_id + - attr: taskid + field: task_id + - attr: status + field: status + value: + field: task_finished + - metric: 'dag.callback_exceptions' + value: + field: dag_callback_exceptions + - metric: 'celery.task_timeout_error' + value: + field: task_celery_timeout_error + - metric: 'task_removed_from_dag.' + labels: + - attr: dagid + field: dag_id + value: + field: task_removed + - metric: 'task_restored_to_dag.' + labels: + - attr: dagid + field: dag_id + value: + field: task_restored + - metric: 'task_instance_created-' + labels: + - attr: operator_name + field: operator_name + value: + field: task_created + - metric: 'dagbag_size' + value: + field: dag_bag_size + - metric: 'dag_processing.import_errors' + value: + field: dag_import_errors + - metric: 'dag_processing.total_parse_time' + value: + field: dag_total_parse_time + - metric: 'dag_processing.last_runtime.' + labels: + - attr: dag_file + field: dag_file + value: + field: dag_last_runtime + - metric: 'dag_processing.last_run.seconds_ago.' + labels: + - attr: dag_file + field: dag_file + value: + field: dag_last_run_seconds_ago + - metric: 'dag_processing.processor_timeouts' + value: + field: processor_timeouts + - metric: 'scheduler.tasks.without_dagrun' + value: + field: task_without_dagrun + - metric: 'scheduler.tasks.running' + value: + field: task_running + - metric: 'scheduler.tasks.starving' + value: + field: task_starving + - metric: 'scheduler.tasks.executable' + value: + field: task_executable + - metric: 'executor.open_slots' + value: + field: executor_open_slots + - metric: 'executor.queued_tasks' + value: + field: executor_queued_tasks + - metric: 'executor.running_tasks' + value: + field: executor_running_tasks + - metric: 'pool.open_slots.' + labels: + - attr: pool_name + field: pool_name + value: + field: pool_open_slots + - metric: 'pool.queued_slots.' + labels: + - attr: pool_name + field: pool_name + value: + field: pool_queued_slots + - metric: 'pool.running_slots.' + labels: + - attr: pool_name + field: pool_name + value: + field: pool_running_slots + - metric: 'pool.starving_tasks.' + labels: + - attr: pool_name + field: pool_name + value: + field: pool_starving_tasks + - metric: 'smart_sensor_operator.poked_tasks' + value: + field: smart_sensor_operator_poked_tasks + - metric: 'smart_sensor_operator.poked_success' + value: + field: smart_sensor_operator_poked_success + - metric: 'smart_sensor_operator.poked_exception' + value: + field: smart_sensor_operator_poked_exception + - metric: 'smart_sensor_operator.exception_failures' + value: + field: smart_sensor_operator_exception_failures + - metric: 'smart_sensor_operator.infra_failures' + value: + field: smart_sensor_operator_infra_failures + - metric: 'dagrun.dependency-check.' + labels: + - attr: dag_id + field: dag_id + value: + field: dag_dependency_check + - metric: 'dag...duration' + labels: + - attr: dag_id + field: dag_id + - attr: task_id + field: task_id + value: + field: task_duration + - metric: 'dag_processing.last_duration.' + labels: + - attr: dag_file + field: dag_file + value: + field: dag_last_duration + - metric: 'dagrun.duration.success.' + labels: + - attr: dag_id + field: dag_id + value: + field: success_dag_duration + - metric: 'dagrun.duration.failed.' + labels: + - attr: dag_id + field: dag_id + value: + field: failed_dag_duration + - metric: 'dagrun.schedule_delay.' + labels: + - attr: dag_id + field: dag_id + value: + field: dag_schedule_delay + - metric: 'scheduler.critical_section_duration' + value: + field: scheduler_critical_section_duration + - metric: 'dagrun..first_task_scheduling_delay' + labels: + - attr: dag_id + field: dag_id + value: + field: dag_first_task_scheduling_delay + ` + + var mappings []StatsdMapping + _ = yaml.Unmarshal([]byte(mappingsYml), &mappings) + + countValue := map[string]interface{}{"count": 4} + timerValue := map[string]interface{}{ + "stddev": 0, + "p99_9": 100, + "mean_rate": 0.2689038235718098, + "max": 100, + "mean": 100, + "p95": 100, + "min": 100, + "median": 100, + "p75": 100, + "p99": 100, + "5m_rate": 0.2, + "count": 1, + "1m_rate": 0.2, + "15m_rate": 0.2, + } + + gaugeValue := map[string]interface{}{"value": 2} + + for _, test := range []struct { + metricName string + metricValue interface{} + expected common.MapStr + }{ + { + metricName: "a_job_name_start", + metricValue: countValue, + expected: common.MapStr{ + "job_name": "a_job_name", + "started": countValue, + }, + }, + { + metricName: "a_job_name_end", + metricValue: countValue, + expected: common.MapStr{ + "job_name": "a_job_name", + "ended": countValue, + }, + }, + { + metricName: "a_job_name_heartbeat_failure", + metricValue: countValue, + expected: common.MapStr{ + "job_name": "a_job_name", + "heartbeat_failure": countValue, + }, + }, + { + metricName: "operator_failures_an_operator_name", + metricValue: countValue, + expected: common.MapStr{ + "operator_name": "an_operator_name", + "failures": countValue, + }, + }, + { + metricName: "operator_successes_an_operator_name", + metricValue: countValue, + expected: common.MapStr{ + "operator_name": "an_operator_name", + "successes": countValue, + }, + }, + { + metricName: "ti_failures", + metricValue: countValue, + expected: common.MapStr{ + "task_failures": countValue, + }, + }, + { + metricName: "ti_successes", + metricValue: countValue, + expected: common.MapStr{ + "task_successes": countValue, + }, + }, + { + metricName: "previously_succeeded", + metricValue: countValue, + expected: common.MapStr{ + "previously_succeeded": countValue, + }, + }, + { + metricName: "zombies_killed", + metricValue: countValue, + expected: common.MapStr{ + "zombies_killed": countValue, + }, + }, + { + metricName: "scheduler_heartbeat", + metricValue: countValue, + expected: common.MapStr{ + "scheduler_heartbeat": countValue, + }, + }, + { + metricName: "dag_processing.processes", + metricValue: countValue, + expected: common.MapStr{ + "dag_processes": countValue, + }, + }, + { + metricName: "dag_processing.manager_stalls", + metricValue: countValue, + expected: common.MapStr{ + "dag_file_processor_manager_stalls": countValue, + }, + }, + { + metricName: "dag_file_refresh_error", + metricValue: countValue, + expected: common.MapStr{ + "dag_file_refresh_error": countValue, + }, + }, + { + metricName: "scheduler.tasks.killed_externally", + metricValue: countValue, + expected: common.MapStr{ + "task_killed_externally": countValue, + }, + }, + { + metricName: "scheduler.tasks.running", + metricValue: countValue, + expected: common.MapStr{ + "task_running": countValue, + }, + }, + { + metricName: "scheduler.tasks.starving", + metricValue: countValue, + expected: common.MapStr{ + "task_starving": countValue, + }, + }, + { + metricName: "scheduler.orphaned_tasks.cleared", + metricValue: countValue, + expected: common.MapStr{ + "task_orphaned_cleared": countValue, + }, + }, + { + metricName: "scheduler.orphaned_tasks.adopted", + metricValue: countValue, + expected: common.MapStr{ + "task_orphaned_adopted": countValue, + }, + }, + { + metricName: "scheduler.critical_section_busy", + metricValue: countValue, + expected: common.MapStr{ + "scheduler_critical_section_busy": countValue, + }, + }, + { + metricName: "sla_email_notification_failure", + metricValue: countValue, + expected: common.MapStr{ + "sla_email_notification_failure": countValue, + }, + }, + { + metricName: "ti.start.a_dagid.a_taskid", + metricValue: countValue, + expected: common.MapStr{ + "dag_id": "a_dagid", + "task_id": "a_taskid", + "task_started": countValue, + }, + }, + { + metricName: "ti.finish.a_dagid.a_taskid.a_status", + metricValue: countValue, + expected: common.MapStr{ + "dag_id": "a_dagid", + "task_id": "a_taskid", + "status": "a_status", + "task_finished": countValue, + }, + }, + { + metricName: "dag.callback_exceptions", + metricValue: countValue, + expected: common.MapStr{ + "dag_callback_exceptions": countValue, + }, + }, + { + metricName: "celery.task_timeout_error", + metricValue: countValue, + expected: common.MapStr{ + "task_celery_timeout_error": countValue, + }, + }, + { + metricName: "task_removed_from_dag.a_dagid", + metricValue: countValue, + expected: common.MapStr{ + "dag_id": "a_dagid", + "task_removed": countValue, + }, + }, + { + metricName: "task_restored_to_dag.a_dagid", + metricValue: countValue, + expected: common.MapStr{ + "dag_id": "a_dagid", + "task_restored": countValue, + }, + }, + { + metricName: "task_instance_created-an_operator_name", + metricValue: countValue, + expected: common.MapStr{ + "operator_name": "an_operator_name", + "task_created": countValue, + }, + }, + { + metricName: "dagbag_size", + metricValue: gaugeValue, + expected: common.MapStr{ + "dag_bag_size": gaugeValue, + }, + }, + { + metricName: "dag_processing.import_errors", + metricValue: gaugeValue, + expected: common.MapStr{ + "dag_import_errors": gaugeValue, + }, + }, + { + metricName: "dag_processing.total_parse_time", + metricValue: gaugeValue, + expected: common.MapStr{ + "dag_total_parse_time": gaugeValue, + }, + }, + { + metricName: "dag_processing.last_runtime.a_dag_file", + metricValue: gaugeValue, + expected: common.MapStr{ + "dag_file": "a_dag_file", + "dag_last_runtime": gaugeValue, + }, + }, + { + metricName: "dag_processing.last_run.seconds_ago.a_dag_file", + metricValue: gaugeValue, + expected: common.MapStr{ + "dag_file": "a_dag_file", + "dag_last_run_seconds_ago": gaugeValue, + }, + }, + { + metricName: "dag_processing.processor_timeouts", + metricValue: gaugeValue, + expected: common.MapStr{ + "processor_timeouts": gaugeValue, + }, + }, + { + metricName: "scheduler.tasks.without_dagrun", + metricValue: gaugeValue, + expected: common.MapStr{ + "task_without_dagrun": gaugeValue, + }, + }, + { + metricName: "scheduler.tasks.running", + metricValue: gaugeValue, + expected: common.MapStr{ + "task_running": gaugeValue, + }, + }, + { + metricName: "scheduler.tasks.starving", + metricValue: gaugeValue, + expected: common.MapStr{ + "task_starving": gaugeValue, + }, + }, + { + metricName: "scheduler.tasks.executable", + metricValue: gaugeValue, + expected: common.MapStr{ + "task_executable": gaugeValue, + }, + }, + { + metricName: "executor.open_slots", + metricValue: gaugeValue, + expected: common.MapStr{ + "executor_open_slots": gaugeValue, + }, + }, + { + metricName: "executor.queued_tasks", + metricValue: gaugeValue, + expected: common.MapStr{ + "executor_queued_tasks": gaugeValue, + }, + }, + { + metricName: "executor.running_tasks", + metricValue: gaugeValue, + expected: common.MapStr{ + "executor_running_tasks": gaugeValue, + }, + }, + { + metricName: "pool.open_slots.a_pool_name", + metricValue: gaugeValue, + expected: common.MapStr{ + "pool_name": "a_pool_name", + "pool_open_slots": gaugeValue, + }, + }, + { + metricName: "pool.queued_slots.a_pool_name", + metricValue: gaugeValue, + expected: common.MapStr{ + "pool_name": "a_pool_name", + "pool_queued_slots": gaugeValue, + }, + }, + { + metricName: "pool.running_slots.a_pool_name", + metricValue: gaugeValue, + expected: common.MapStr{ + "pool_name": "a_pool_name", + "pool_running_slots": gaugeValue, + }, + }, + { + metricName: "pool.starving_tasks.a_pool_name", + metricValue: gaugeValue, + expected: common.MapStr{ + "pool_name": "a_pool_name", + "pool_starving_tasks": gaugeValue, + }, + }, + { + metricName: "smart_sensor_operator.poked_tasks", + metricValue: gaugeValue, + expected: common.MapStr{ + "smart_sensor_operator_poked_tasks": gaugeValue, + }, + }, + { + metricName: "smart_sensor_operator.poked_success", + metricValue: gaugeValue, + expected: common.MapStr{ + "smart_sensor_operator_poked_success": gaugeValue, + }, + }, + { + metricName: "smart_sensor_operator.poked_exception", + metricValue: gaugeValue, + expected: common.MapStr{ + "smart_sensor_operator_poked_exception": gaugeValue, + }, + }, + { + metricName: "smart_sensor_operator.exception_failures", + metricValue: gaugeValue, + expected: common.MapStr{ + "smart_sensor_operator_exception_failures": gaugeValue, + }, + }, + { + metricName: "smart_sensor_operator.infra_failures", + metricValue: gaugeValue, + expected: common.MapStr{ + "smart_sensor_operator_infra_failures": gaugeValue, + }, + }, + { + metricName: "dagrun.dependency-check.a_dag_id", + metricValue: timerValue, + expected: common.MapStr{ + "dag_id": "a_dag_id", + "dag_dependency_check": timerValue, + }, + }, + { + metricName: "dag.a_dag_id.a_task_id.duration", + metricValue: timerValue, + expected: common.MapStr{ + "dag_id": "a_dag_id", + "task_id": "a_task_id", + "task_duration": timerValue, + }, + }, + { + metricName: "dag_processing.last_duration.a_dag_file", + metricValue: timerValue, + expected: common.MapStr{ + "dag_file": "a_dag_file", + "dag_last_duration": timerValue, + }, + }, + { + metricName: "dagrun.duration.success.a_dag_id", + metricValue: timerValue, + expected: common.MapStr{ + "dag_id": "a_dag_id", + "success_dag_duration": timerValue, + }, + }, + { + metricName: "dagrun.duration.failed.a_dag_id", + metricValue: timerValue, + expected: common.MapStr{ + "dag_id": "a_dag_id", + "failed_dag_duration": timerValue, + }, + }, + { + metricName: "dagrun.schedule_delay.a_dag_id", + metricValue: timerValue, + expected: common.MapStr{ + "dag_id": "a_dag_id", + "dag_schedule_delay": timerValue, + }, + }, + { + metricName: "scheduler.critical_section_duration", + metricValue: timerValue, + expected: common.MapStr{ + "scheduler_critical_section_duration": timerValue, + }, + }, + { + metricName: "dagrun.a_dag_id.first_task_scheduling_delay", + metricValue: timerValue, + expected: common.MapStr{ + "dag_id": "a_dag_id", + "dag_first_task_scheduling_delay": timerValue, + }, + }, + { + metricName: "not_mapped_metric", + metricValue: timerValue, + expected: common.MapStr{}, + }, + } { + t.Run(test.metricName, func(t *testing.T) { + metricSetFields := common.MapStr{} + builtMappings, _ := buildMappings(mappings) + eventMapping(test.metricName, test.metricValue, metricSetFields, builtMappings) + + assert.Equal(t, test.expected, metricSetFields) + }) + } +} + +func TestBuildMappings(t *testing.T) { + for _, test := range []struct { + title string + input string + err error + expected map[string]StatsdMapping + }{ + { + title: "no err", + input: ` + - metric: '_start' + labels: + - attr: job_name + field: job_name + value: + field: started +`, + err: nil, + expected: map[string]StatsdMapping{ + "_start": { + Metric: "_start", + Labels: []Label{ + {Attr: "job_name", Field: "job_name"}, + }, + Value: Value{Field: "started"}, + }, + }, + }, + { + title: "not matching label", + input: ` + - metric: '_start' + labels: + - attr: not_matching + field: job_name + value: + field: started +`, + err: errInvalidMapping{ + metricLabels: []string{"job_name"}, + attrLabels: []string{"not_matching"}, + }, + expected: nil, + }, + { + title: "not existing label", + input: ` + - metric: '_start' + labels: + - attr: job_name + field: job_name + - attr: not_existing + field: not_existing + value: + field: started +`, + err: errInvalidMapping{ + metricLabels: []string{"job_name"}, + attrLabels: []string{"job_name", "not_existing"}, + }, + expected: nil, + }, + { + title: "repeated label", + input: ` + - metric: '__start' + labels: + - attr: job_name + field: repeated_label_field + - attr: job_name + field: repeated_label_field + value: + field: started +`, + err: fmt.Errorf(`repeated label fields "repeated_label_field"`), + expected: nil, + }, + { + title: "colliding field", + input: ` + - metric: '_start' + labels: + - attr: job_name + field: colliding_field + value: + field: colliding_field +`, + err: fmt.Errorf(`collision between label field "colliding_field" and value field "colliding_field"`), + expected: nil, + }, + } { + t.Run(test.title, func(t *testing.T) { + var mappings []StatsdMapping + err := yaml.Unmarshal([]byte(test.input), &mappings) + actual, err := buildMappings(mappings) + for k, v := range actual { + v.regex = nil + actual[k] = v + } + assert.Equal(t, test.err, err, test.input) + assert.Equal(t, test.expected, actual, test.input) + }) + } +} + func TestParseMetrics(t *testing.T) { for _, test := range []struct { input string diff --git a/x-pack/metricbeat/module/statsd/server/mapping.go b/x-pack/metricbeat/module/statsd/server/mapping.go new file mode 100644 index 000000000000..ff0cf332a0c1 --- /dev/null +++ b/x-pack/metricbeat/module/statsd/server/mapping.go @@ -0,0 +1,23 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package server + +import "regexp" + +type StatsdMapping struct { + Metric string + Labels []Label + Value Value + regex *regexp.Regexp +} + +type Value struct { + Field string +} + +type Label struct { + Attr string + Field string +} diff --git a/x-pack/metricbeat/module/statsd/server/server.go b/x-pack/metricbeat/module/statsd/server/server.go index 25f8fd5a78f1..3e9f35d84366 100644 --- a/x-pack/metricbeat/module/statsd/server/server.go +++ b/x-pack/metricbeat/module/statsd/server/server.go @@ -5,6 +5,9 @@ package server import ( + "fmt" + "regexp" + "strings" "time" "github.com/elastic/beats/v7/libbeat/common" @@ -19,14 +22,46 @@ func init() { mb.Registry.MustAddMetricSet("statsd", "server", New, mb.DefaultMetricSet()) } +type errInvalidMapping struct { + metricLabels []string + attrLabels []string +} + +func (e errInvalidMapping) Error() string { + return fmt.Sprintf( + "labels in metric (%s) don't match labels attributes (%s)", + strings.Join(e.metricLabels, ","), + strings.Join(e.attrLabels, ",")) +} + +func newErrInvalidMapping(metricLabels []string, attrLabels []Label) error { + attrLabelsStringSlice := make([]string, len(attrLabels)) + for i, attrLabel := range attrLabels { + attrLabelsStringSlice[i] = attrLabel.Attr + } + + if len(metricLabels) > 0 { + metricLabels = metricLabels[1:] + } else { + metricLabels = []string{} + } + + return errInvalidMapping{ + metricLabels: metricLabels, + attrLabels: attrLabelsStringSlice, + } +} + // Config for the statsd server metricset. type Config struct { - TTL time.Duration `config:"ttl"` + TTL time.Duration `config:"ttl"` + Mappings []StatsdMapping `config:"statsd.mappings"` } func defaultConfig() Config { return Config{ - TTL: time.Second * 30, + TTL: time.Second * 30, + Mappings: nil, } } @@ -38,6 +73,7 @@ type MetricSet struct { mb.BaseMetricSet server serverhelper.Server processor *metricProcessor + mappings map[string]StatsdMapping } // New create a new instance of the MetricSet @@ -55,13 +91,71 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { } processor := newMetricProcessor(config.TTL) + + mappings, err := buildMappings(config.Mappings) + if err != nil { + return nil, fmt.Errorf("invalid mapping configuration for `statsd.mapping`: %w", err) + } return &MetricSet{ BaseMetricSet: base, server: svc, + mappings: mappings, processor: processor, }, nil } +func buildMappings(config []StatsdMapping) (map[string]StatsdMapping, error) { + mappings := make(map[string]StatsdMapping, len(config)) + replacer := strings.NewReplacer(".", `\.`, "<", "(?P<", ">", ">[^.]+)") + for _, mapping := range config { + regexPattern := replacer.Replace(mapping.Metric) + var err error + mapping.regex, err = regexp.Compile(fmt.Sprintf("^%s$", regexPattern)) + if err != nil { + return nil, fmt.Errorf("invalid pattern %s: %w", mapping.Metric, err) + } + + var matchingLabels int + names := mapping.regex.SubexpNames() + if len(names)-1 != len(mapping.Labels) { + return nil, newErrInvalidMapping(names, mapping.Labels) + } + + repeatedLabelFields := make([]string, 0) + uniqueLabelFields := make(map[string]struct{}) + for _, label := range mapping.Labels { + for _, name := range names { + if label.Attr == name { + matchingLabels++ + } + } + + if _, ok := uniqueLabelFields[label.Field]; !ok { + uniqueLabelFields[label.Field] = struct{}{} + } else { + repeatedLabelFields = append(repeatedLabelFields, label.Field) + } + + if label.Field == mapping.Value.Field { + return nil, fmt.Errorf(`collision between label field "%s" and value field "%s"`, label.Field, mapping.Value.Field) + } + } + + if matchingLabels != len(mapping.Labels) { + return nil, newErrInvalidMapping(names, mapping.Labels) + } + + if len(uniqueLabelFields) != len(mapping.Labels) { + return nil, fmt.Errorf(`repeated label fields "%s"`, strings.Join(repeatedLabelFields, ",")) + } + + mappings[mapping.Metric] = mapping + + } + + return mappings, nil +} + func (m *MetricSet) getEvents() []*mb.Event { groups := m.processor.GetAll() events := make([]*mb.Event, len(groups)) @@ -75,13 +169,17 @@ func (m *MetricSet) getEvents() []*mb.Event { sanitizedMetrics := common.MapStr{} for k, v := range tagGroup.metrics { - sanitizedMetrics[common.DeDot(k)] = v + eventMapping(k, v, sanitizedMetrics, m.mappings) + } + + if len(sanitizedMetrics) == 0 { + continue } events[idx] = &mb.Event{ MetricSetFields: sanitizedMetrics, RootFields: common.MapStr{"labels": mapstrTags}, - Namespace: "statsd", + Namespace: m.Module().Name(), } } return events @@ -102,6 +200,10 @@ func (m *MetricSet) Run(reporter mb.PushReporterV2) { return case <-reportPeriod.C: for _, e := range m.getEvents() { + if e == nil { + continue + } + reporter.Event(*e) } case msg := <-m.server.GetEvents(): diff --git a/x-pack/metricbeat/modules.d/airflow.yml.disabled b/x-pack/metricbeat/modules.d/airflow.yml.disabled new file mode 100644 index 000000000000..010b1daadd8c --- /dev/null +++ b/x-pack/metricbeat/modules.d/airflow.yml.disabled @@ -0,0 +1,8 @@ +# Module: airflow +# Docs: https://www.elastic.co/guide/en/beats/metricbeat/master/metricbeat-module-airflow.html + +- module: airflow + host: "localhost" + port: "8126" + #ttl: "30s" + metricsets: [ 'statsd' ]