diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index f59eeff60a7..c5bab2d15a8 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -39,6 +39,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di - Add config option for windows/perfmon metricset to ignore non existent counters. {pull}6432[6432] - Refactor docker CPU calculations to be more consistent with `docker stats`. {pull}6608[6608] - Update logstash.node_stats metricset to write data under `logstash.node.stats.*`. {pull}6714[6714] +- Fixed typo in values for `state_container` `status.phase`, from `terminate` to `terminated`. {pull}6916[6916] *Packetbeat* @@ -223,6 +224,9 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di *Metricbeat* +- Kubernetes `state_container` `cpu.limit.nanocores` and `cpu.request.nanocores` have been +deprecated in favor of `cpu.*.cores`. {pull}6916[6916] + *Packetbeat* *Winlogbeat* diff --git a/metricbeat/docs/fields.asciidoc b/metricbeat/docs/fields.asciidoc index dadf0167df1..b4d60b5b22b 100644 --- a/metricbeat/docs/fields.asciidoc +++ b/metricbeat/docs/fields.asciidoc @@ -7307,6 +7307,26 @@ Container restarts count -- +*`kubernetes.container.cpu.limit.cores`*:: ++ +-- +type: long + +Container CPU cores limit + + +-- + +*`kubernetes.container.cpu.request.cores`*:: ++ +-- +type: long + +Container CPU requested cores + + +-- + *`kubernetes.container.cpu.limit.nanocores`*:: + -- diff --git a/metricbeat/helper/prometheus.go b/metricbeat/helper/prometheus.go deleted file mode 100644 index 3338cfbeeb0..00000000000 --- a/metricbeat/helper/prometheus.go +++ /dev/null @@ -1,59 +0,0 @@ -package helper - -import ( - "fmt" - "io" - - dto "github.com/prometheus/client_model/go" - "github.com/prometheus/common/expfmt" - - "github.com/elastic/beats/metricbeat/mb" -) - -// Prometheus helper retrieves prometheus formatted metrics -type Prometheus struct { - HTTP -} - -// NewPrometheusClient creates new prometheus helper -func NewPrometheusClient(base mb.BaseMetricSet) (*Prometheus, error) { - http, err := NewHTTP(base) - if err != nil { - return nil, err - } - return &Prometheus{*http}, nil -} - -// GetFamilies requests metric families from prometheus endpoint and returns them -func (p *Prometheus) GetFamilies() ([]*dto.MetricFamily, error) { - resp, err := p.FetchResponse() - if err != nil { - return nil, err - } - defer resp.Body.Close() - - format := expfmt.ResponseFormat(resp.Header) - if format == "" { - return nil, fmt.Errorf("Invalid format for response of response") - } - - decoder := expfmt.NewDecoder(resp.Body, format) - if decoder == nil { - return nil, fmt.Errorf("Unable to create decoder to decode response") - } - - families := []*dto.MetricFamily{} - for { - mf := &dto.MetricFamily{} - err = decoder.Decode(mf) - if err != nil { - if err == io.EOF { - break - } - } else { - families = append(families, mf) - } - } - - return families, nil -} diff --git a/metricbeat/helper/prometheus/label.go b/metricbeat/helper/prometheus/label.go new file mode 100644 index 00000000000..8ac879a9401 --- /dev/null +++ b/metricbeat/helper/prometheus/label.go @@ -0,0 +1,42 @@ +package prometheus + +// LabelMap defines the mapping from Prometheus label to a Metricbeat field +type LabelMap interface { + // GetField returns the resulting field name + GetField() string + + // IsKey returns true if the label is a key label + IsKey() bool +} + +// Label maps a Prometheus label to a Metricbeat field +func Label(field string) LabelMap { + return &commonLabel{ + field: field, + key: false, + } +} + +// KeyLabel maps a Prometheus label to a Metricbeat field. The label is flagged as key. +// Metrics with the same tuple of key labels will be grouped in the same event. +func KeyLabel(field string) LabelMap { + return &commonLabel{ + field: field, + key: true, + } +} + +type commonLabel struct { + field string + key bool +} + +// GetField returns the resulting field name +func (l *commonLabel) GetField() string { + return l.field +} + +// IsKey returns true if the label is a key label +func (l *commonLabel) IsKey() bool { + return l.key +} diff --git a/metricbeat/helper/prometheus/metric.go b/metricbeat/helper/prometheus/metric.go new file mode 100644 index 00000000000..91a5ee8cb6f --- /dev/null +++ b/metricbeat/helper/prometheus/metric.go @@ -0,0 +1,126 @@ +package prometheus + +import ( + "strings" + + dto "github.com/prometheus/client_model/go" +) + +// MetricMap defines the mapping from Prometheus metric to a Metricbeat field +type MetricMap interface { + // GetField returns the resulting field name + GetField() string + + // GetValue returns the resulting value + GetValue(m *dto.Metric) interface{} +} + +// Metric directly maps a Prometheus metric to a Metricbeat field +func Metric(field string) MetricMap { + return &commonMetric{ + field: field, + } +} + +// KeywordMetric maps a Prometheus metric to a Metricbeat field, stores the +// given keyword when source metric value is 1 +func KeywordMetric(field, keyword string) MetricMap { + return &keywordMetric{ + commonMetric{ + field: field, + }, + keyword, + } +} + +// BooleanMetric maps a Prometheus metric to a Metricbeat field of bool type +func BooleanMetric(field string) MetricMap { + return &booleanMetric{ + commonMetric{ + field: field, + }, + } +} + +// LabelMetric maps a Prometheus metric to a Metricbeat field, stores the value +// of a given label on it if the gauge value is 1 +func LabelMetric(field, label string) MetricMap { + return &labelMetric{ + commonMetric{ + field: field, + }, + label, + } +} + +type commonMetric struct { + field string +} + +// GetField returns the resulting field name +func (m *commonMetric) GetField() string { + return m.field +} + +// GetValue returns the resulting value +func (m *commonMetric) GetValue(metric *dto.Metric) interface{} { + counter := metric.GetCounter() + if counter != nil { + return int64(counter.GetValue()) + } + + gauge := metric.GetGauge() + if gauge != nil { + return gauge.GetValue() + } + + // Other types are not supported here + return nil +} + +type keywordMetric struct { + commonMetric + keyword string +} + +// GetValue returns the resulting value +func (m *keywordMetric) GetValue(metric *dto.Metric) interface{} { + if gauge := metric.GetGauge(); gauge != nil && gauge.GetValue() == 1 { + return m.keyword + } + return nil +} + +type booleanMetric struct { + commonMetric +} + +// GetValue returns the resulting value +func (m *booleanMetric) GetValue(metric *dto.Metric) interface{} { + if gauge := metric.GetGauge(); gauge != nil { + return gauge.GetValue() == 1 + } + return nil +} + +type labelMetric struct { + commonMetric + label string +} + +// GetValue returns the resulting value +func (m *labelMetric) GetValue(metric *dto.Metric) interface{} { + if gauge := metric.GetGauge(); gauge != nil && gauge.GetValue() == 1 { + return strings.ToLower(getLabel(metric, m.label)) + } + return nil +} + +func getLabel(metric *dto.Metric, name string) string { + for _, label := range metric.GetLabel() { + if label.GetName() == name { + return label.GetValue() + } + } + return "" +} diff --git a/metricbeat/helper/prometheus/prometheus.go b/metricbeat/helper/prometheus/prometheus.go new file mode 100644 index 00000000000..edd1ec72844 --- /dev/null +++ b/metricbeat/helper/prometheus/prometheus.go @@ -0,0 +1,165 @@ +package prometheus + +import ( + "fmt" + "io" + "net/http" + + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/metricbeat/helper" + "github.com/elastic/beats/metricbeat/mb" +) + +// Prometheus helper retrieves prometheus formatted metrics +type Prometheus interface { + // GetFamilies requests metric families from prometheus endpoint and returns them + GetFamilies() ([]*dto.MetricFamily, error) + + GetProcessedMetrics(mapping *MetricsMapping) ([]common.MapStr, error) +} + +type prometheus struct { + httpfetcher +} + +type httpfetcher interface { + FetchResponse() (*http.Response, error) +} + +// NewPrometheusClient creates new prometheus helper +func NewPrometheusClient(base mb.BaseMetricSet) (Prometheus, error) { + http, err := helper.NewHTTP(base) + if err != nil { + return nil, err + } + return &prometheus{http}, nil +} + +// GetFamilies requests metric families from prometheus endpoint and returns them +func (p *prometheus) GetFamilies() ([]*dto.MetricFamily, error) { + resp, err := p.FetchResponse() + if err != nil { + return nil, err + } + defer resp.Body.Close() + + format := expfmt.ResponseFormat(resp.Header) + if format == "" { + return nil, fmt.Errorf("Invalid format for response of response") + } + + decoder := expfmt.NewDecoder(resp.Body, format) + if decoder == nil { + return nil, fmt.Errorf("Unable to create decoder to decode response") + } + + families := []*dto.MetricFamily{} + for { + mf := &dto.MetricFamily{} + err = decoder.Decode(mf) + if err != nil { + if err == io.EOF { + break + } + } else { + families = append(families, mf) + } + } + + return families, nil +} + +// MetricsMapping defines mapping settings for Prometheus metrics, to be used with `GetProcessedMetrics` +type MetricsMapping struct { + // Metrics translates from from prometheus metric name to Metricbeat fields + Metrics map[string]MetricMap + + // Labels translate from prometheus label names to Metricbeat fields + Labels map[string]LabelMap + + // ExtraFields adds the given fields to all events coming from `GetProcessedMetrics` + ExtraFields map[string]string +} + +func (p *prometheus) GetProcessedMetrics(mapping *MetricsMapping) ([]common.MapStr, error) { + families, err := p.GetFamilies() + if err != nil { + return nil, err + } + + eventsMap := map[string]common.MapStr{} + for _, family := range families { + for _, metric := range family.GetMetric() { + m, ok := mapping.Metrics[family.GetName()] + + // Ignore unknown metrics + if !ok { + continue + } + + field := m.GetField() + value := m.GetValue(metric) + + // Ignore retrieval errors (bad conf) + if value == nil { + continue + } + + // Convert labels + labels := common.MapStr{} + keyLabels := common.MapStr{} + for k, v := range getLabels(metric) { + if l, ok := mapping.Labels[k]; ok { + if l.IsKey() { + keyLabels[l.GetField()] = v + } else { + labels[l.GetField()] = v + } + } + } + + event := getEvent(eventsMap, keyLabels) + // Empty field means we ignore the metric but still process its labels + if field != "" { + event[field] = value + } + + event.Update(labels) + } + } + + // populate events array from values in eventsMap + events := make([]common.MapStr, 0, len(eventsMap)) + for _, event := range eventsMap { + // Add extra fields + for k, v := range mapping.ExtraFields { + event[k] = v + } + + events = append(events, event) + } + return events, nil +} + +func getEvent(m map[string]common.MapStr, labels common.MapStr) common.MapStr { + hash := labels.String() + res, ok := m[hash] + if !ok { + res = labels + m[hash] = res + } + return res +} + +func getLabels(metric *dto.Metric) common.MapStr { + labels := common.MapStr{} + for _, label := range metric.GetLabel() { + if label.GetName() != "" && label.GetValue() != "" { + labels[label.GetName()] = label.GetValue() + } + } + return labels +} diff --git a/metricbeat/helper/prometheus/prometheus_test.go b/metricbeat/helper/prometheus/prometheus_test.go new file mode 100644 index 00000000000..b4637f0c63f --- /dev/null +++ b/metricbeat/helper/prometheus/prometheus_test.go @@ -0,0 +1,189 @@ +package prometheus + +import ( + "bytes" + "io/ioutil" + "net/http" + "net/http/httptest" + "sort" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/common" +) + +const promMetrics = ` +# TYPE first_metric gauge +first_metric{label1="value1",label2="value2",label3="value3"} 1 +# TYPE second_metric gauge +second_metric{label1="value1",label3="othervalue"} 0 +` + +type mockFetcher struct{} + +func (m mockFetcher) FetchResponse() (*http.Response, error) { + return &http.Response{ + Header: make(http.Header), + Body: ioutil.NopCloser(bytes.NewReader([]byte(promMetrics))), + }, nil +} + +func TestPrometheus(t *testing.T) { + server := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + w.Header().Set("Content-Type", "text/plain; charset=ISO-8859-1") + w.Write([]byte(promMetrics)) + })) + + server.Start() + defer server.Close() + + p := &prometheus{mockFetcher{}} + + tests := []struct { + mapping *MetricsMapping + msg string + expected []common.MapStr + }{ + { + msg: "Simple field map", + mapping: &MetricsMapping{ + Metrics: map[string]MetricMap{ + "first_metric": Metric("first.metric"), + }, + }, + expected: []common.MapStr{ + common.MapStr{ + "first.metric": 1.0, + }, + }, + }, + { + msg: "Simple field map with labels", + mapping: &MetricsMapping{ + Metrics: map[string]MetricMap{ + "first_metric": Metric("first.metric"), + }, + Labels: map[string]LabelMap{ + "label1": Label("labels.label1"), + "label2": Label("labels.label2"), + }, + }, + expected: []common.MapStr{ + common.MapStr{ + "first.metric": 1.0, + "labels.label1": "value1", + "labels.label2": "value2", + }, + }, + }, + { + msg: "Several metrics", + mapping: &MetricsMapping{ + Metrics: map[string]MetricMap{ + "first_metric": Metric("first.metric"), + "second_metric": Metric("second.metric"), + }, + Labels: map[string]LabelMap{ + "label3": KeyLabel("labels.label3"), + }, + }, + expected: []common.MapStr{ + common.MapStr{ + "first.metric": 1.0, + "labels.label3": "value3", + }, + common.MapStr{ + "second.metric": 0.0, + "labels.label3": "othervalue", + }, + }, + }, + { + msg: "Grouping by key labels", + mapping: &MetricsMapping{ + Metrics: map[string]MetricMap{ + "first_metric": Metric("first.metric"), + "second_metric": Metric("second.metric"), + }, + Labels: map[string]LabelMap{ + "label1": KeyLabel("labels.label1"), + "label2": Label("labels.label2"), + }, + }, + expected: []common.MapStr{ + common.MapStr{ + "first.metric": 1.0, + "second.metric": 0.0, + "labels.label1": "value1", + "labels.label2": "value2", + }, + }, + }, + { + msg: "Keyword metrics", + mapping: &MetricsMapping{ + Metrics: map[string]MetricMap{ + "first_metric": KeywordMetric("first.metric", "works"), + "second_metric": KeywordMetric("second.metric", "itsnot"), + }, + Labels: map[string]LabelMap{ + "label1": KeyLabel("labels.label1"), + }, + }, + expected: []common.MapStr{ + common.MapStr{ + "first.metric": "works", + "labels.label1": "value1", + }, + }, + }, + { + msg: "Boolean metrics", + mapping: &MetricsMapping{ + Metrics: map[string]MetricMap{ + "first_metric": BooleanMetric("first.metric"), + "second_metric": BooleanMetric("second.metric"), + }, + Labels: map[string]LabelMap{ + "label1": KeyLabel("labels.label1"), + }, + }, + expected: []common.MapStr{ + common.MapStr{ + "first.metric": true, + "second.metric": false, + "labels.label1": "value1", + }, + }, + }, + { + msg: "Label metrics", + mapping: &MetricsMapping{ + Metrics: map[string]MetricMap{ + "first_metric": LabelMetric("first.metric", "label3"), + }, + Labels: map[string]LabelMap{ + "label1": Label("labels.label1"), + }, + }, + expected: []common.MapStr{ + common.MapStr{ + "first.metric": "value3", + "labels.label1": "value1", + }, + }, + }, + } + + for _, test := range tests { + res, err := p.GetProcessedMetrics(test.mapping) + assert.Nil(t, err, test.msg) + // Sort slice to avoid randomness + sort.Slice(res, func(i, j int) bool { + return res[i].String() < res[j].String() + }) + assert.Equal(t, test.expected, res, test.msg) + } +} diff --git a/metricbeat/module/kubernetes/state_container/_meta/fields.yml b/metricbeat/module/kubernetes/state_container/_meta/fields.yml index 799ee460936..4f65932b964 100644 --- a/metricbeat/module/kubernetes/state_container/_meta/fields.yml +++ b/metricbeat/module/kubernetes/state_container/_meta/fields.yml @@ -26,11 +26,21 @@ - name: cpu type: group fields: + - name: limit.cores + type: long + description: > + Container CPU cores limit + - name: request.cores + type: long + description: > + Container CPU requested cores - name: limit.nanocores type: long + deprecated: true description: > Container CPU nanocores limit - name: request.nanocores + deprecated: true type: long description: > Container CPU requested nanocores diff --git a/metricbeat/module/kubernetes/state_container/data.go b/metricbeat/module/kubernetes/state_container/data.go deleted file mode 100644 index d3b8b35df44..00000000000 --- a/metricbeat/module/kubernetes/state_container/data.go +++ /dev/null @@ -1,98 +0,0 @@ -package state_container - -import ( - "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/metricbeat/mb" - "github.com/elastic/beats/metricbeat/module/kubernetes/util" - - dto "github.com/prometheus/client_model/go" -) - -const ( - // Nanocores conversion 10^9 - nanocores = 1000000000 -) - -func eventMapping(families []*dto.MetricFamily) ([]common.MapStr, error) { - eventsMap := map[string]common.MapStr{} - for _, family := range families { - for _, metric := range family.GetMetric() { - container := util.GetLabel(metric, "container") - if container == "" { - continue - } - - namespace := util.GetLabel(metric, "namespace") - pod := util.GetLabel(metric, "pod") - containerKey := namespace + "::" + pod + "::" + container - event, ok := eventsMap[containerKey] - if !ok { - event = common.MapStr{} - eventsMap[containerKey] = event - } - - switch family.GetName() { - case "kube_pod_container_info": - event.Put(mb.ModuleDataKey+".pod.name", util.GetLabel(metric, "pod")) - event.Put(mb.ModuleDataKey+".namespace", util.GetLabel(metric, "namespace")) - event.Put(mb.NamespaceKey, "container") - - event.Put("name", util.GetLabel(metric, "container")) - event.Put("id", util.GetLabel(metric, "container_id")) - event.Put("image", util.GetLabel(metric, "image")) - - case "kube_pod_container_resource_limits_cpu_cores": - event.Put(mb.ModuleDataKey+".node.name", util.GetLabel(metric, "node")) - event.Put("cpu.limit.nanocores", metric.GetGauge().GetValue()*nanocores) - cuid := util.ContainerUID(util.GetLabel(metric, "namespace"), util.GetLabel(metric, "pod"), util.GetLabel(metric, "container")) - util.PerfMetrics.ContainerCoresLimit.Set(cuid, metric.GetGauge().GetValue()) - - case "kube_pod_container_resource_requests_cpu_cores": - event.Put(mb.ModuleDataKey+".node.name", util.GetLabel(metric, "node")) - event.Put("cpu.request.nanocores", metric.GetGauge().GetValue()*nanocores) - - case "kube_pod_container_resource_limits_memory_bytes": - event.Put(mb.ModuleDataKey+".node.name", util.GetLabel(metric, "node")) - event.Put("memory.limit.bytes", metric.GetGauge().GetValue()) - cuid := util.ContainerUID(util.GetLabel(metric, "namespace"), util.GetLabel(metric, "pod"), util.GetLabel(metric, "container")) - util.PerfMetrics.ContainerMemLimit.Set(cuid, metric.GetGauge().GetValue()) - - case "kube_pod_container_resource_requests_memory_bytes": - event.Put(mb.ModuleDataKey+".node.name", util.GetLabel(metric, "node")) - event.Put("memory.request.bytes", metric.GetGauge().GetValue()) - - case "kube_pod_container_status_ready": - event.Put("status.ready", metric.GetGauge().GetValue() == 1) - - case "kube_pod_container_status_restarts": - event.Put("status.restarts", metric.GetCounter().GetValue()) - - case "kube_pod_container_status_running": - if metric.GetGauge().GetValue() == 1 { - event.Put("status.phase", "running") - } - - case "kube_pod_container_status_terminated": - if metric.GetGauge().GetValue() == 1 { - event.Put("status.phase", "terminate") - } - - case "kube_pod_container_status_waiting": - if metric.GetGauge().GetValue() == 1 { - event.Put("status.phase", "waiting") - } - - default: - // Ignore unknown metric - continue - } - } - } - - // initialize, populate events array from values in eventsMap - events := make([]common.MapStr, 0, len(eventsMap)) - for _, event := range eventsMap { - events = append(events, event) - } - return events, nil -} diff --git a/metricbeat/module/kubernetes/state_container/state_container.go b/metricbeat/module/kubernetes/state_container/state_container.go index 9ad7a845e3f..47ae638e15a 100644 --- a/metricbeat/module/kubernetes/state_container/state_container.go +++ b/metricbeat/module/kubernetes/state_container/state_container.go @@ -2,7 +2,7 @@ package state_container import ( "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/metricbeat/helper" + p "github.com/elastic/beats/metricbeat/helper/prometheus" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" ) @@ -10,6 +10,8 @@ import ( const ( defaultScheme = "http" defaultPath = "/metrics" + // Nanocores conversion 10^9 + nanocores = 1000000000 ) var ( @@ -17,6 +19,35 @@ var ( DefaultScheme: defaultScheme, DefaultPath: defaultPath, }.Build() + + mapping = &p.MetricsMapping{ + Metrics: map[string]p.MetricMap{ + "kube_pod_container_info": p.Metric(""), + "kube_pod_container_resource_limits_cpu_cores": p.Metric("cpu.limit.cores"), + "kube_pod_container_resource_requests_cpu_cores": p.Metric("cpu.request.cores"), + "kube_pod_container_resource_limits_memory_bytes": p.Metric("memory.limit.bytes"), + "kube_pod_container_resource_requests_memory_bytes": p.Metric("memory.request.bytes"), + "kube_pod_container_status_ready": p.BooleanMetric("status.ready"), + "kube_pod_container_status_restarts": p.Metric("status.restarts"), + "kube_pod_container_status_running": p.KeywordMetric("status.phase", "running"), + "kube_pod_container_status_terminated": p.KeywordMetric("status.phase", "terminated"), + "kube_pod_container_status_waiting": p.KeywordMetric("status.phase", "waiting"), + }, + + Labels: map[string]p.LabelMap{ + "pod": p.KeyLabel(mb.ModuleDataKey + ".pod.name"), + "container": p.KeyLabel("name"), + "namespace": p.KeyLabel(mb.ModuleDataKey + ".namespace"), + + "node": p.Label(mb.ModuleDataKey + ".node.name"), + "container_id": p.Label("id"), + "image": p.Label("image"), + }, + + ExtraFields: map[string]string{ + mb.NamespaceKey: "container", + }, + } ) // init registers the MetricSet with the central registry. @@ -33,14 +64,14 @@ func init() { // multiple fetch calls. type MetricSet struct { mb.BaseMetricSet - prometheus *helper.Prometheus + prometheus p.Prometheus } // New create a new instance of the MetricSet // Part of new is also setting up the configuration by processing additional // configuration entries if needed. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { - prometheus, err := helper.NewPrometheusClient(base) + prometheus, err := p.NewPrometheusClient(base) if err != nil { return nil, err } @@ -54,10 +85,24 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // It returns the event which is then forward to the output. In case of an error, a // descriptive error must be returned. func (m *MetricSet) Fetch() ([]common.MapStr, error) { - families, err := m.prometheus.GetFamilies() + events, err := m.prometheus.GetProcessedMetrics(mapping) if err != nil { return nil, err } - return eventMapping(families) + for _, event := range events { + if request, ok := event["cpu.request.cores"]; ok { + if requestCores, ok := request.(float64); ok { + event["cpu.request.nanocores"] = requestCores * nanocores + } + } + + if limit, ok := event["cpu.limit.cores"]; ok { + if limitCores, ok := limit.(float64); ok { + event["cpu.limit.nanocores"] = limitCores * nanocores + } + } + } + + return events, err } diff --git a/metricbeat/module/kubernetes/state_container/state_container_test.go b/metricbeat/module/kubernetes/state_container/state_container_test.go index 4de73cc0aa7..fd160aa0817 100644 --- a/metricbeat/module/kubernetes/state_container/state_container_test.go +++ b/metricbeat/module/kubernetes/state_container/state_container_test.go @@ -44,7 +44,7 @@ func TestEventMapping(t *testing.T) { events, err := f.Fetch() assert.NoError(t, err) - assert.Equal(t, 12, len(events), "Wrong number of returned events") + assert.Equal(t, 11, len(events), "Wrong number of returned events") testCases := testCases() for _, event := range events { @@ -99,7 +99,8 @@ func testCases() map[string]map[string]interface{} { "memory.limit.bytes": 178257920, "memory.request.bytes": 73400320, - "cpu.request.nanocores": float64(1e+08), + "cpu.request.cores": 0.1, + "cpu.request.nanocores": 1e+08, }, "test@kube-dns-v20-5g5cb-test@kubedns": { "_namespace": "container", @@ -110,13 +111,14 @@ func testCases() map[string]map[string]interface{} { "id": "docker://fa3d83f648de42492b38fa3e8501d109376f391c50f2bd210c895c8477ae4b62-test", "image": "gcr.io/google_containers/kubedns-amd64:1.9-test", - "status.phase": "terminate", + "status.phase": "terminated", "status.ready": false, "status.restarts": 3, "memory.limit.bytes": 278257920, "memory.request.bytes": 83400320, - "cpu.request.nanocores": float64(2e+08), + "cpu.request.cores": 0.2, + "cpu.request.nanocores": 2e+08, }, "kube-system@kube-dns-v20-5g5cb@healthz": { "_namespace": "container", @@ -133,7 +135,8 @@ func testCases() map[string]map[string]interface{} { "memory.limit.bytes": 52428800, "memory.request.bytes": 52428800, - "cpu.request.nanocores": float64(1e+07), + "cpu.request.cores": 0.01, + "cpu.request.nanocores": 1e+07, }, } } diff --git a/metricbeat/module/kubernetes/state_deployment/data.go b/metricbeat/module/kubernetes/state_deployment/data.go deleted file mode 100644 index a9b21660e19..00000000000 --- a/metricbeat/module/kubernetes/state_deployment/data.go +++ /dev/null @@ -1,66 +0,0 @@ -package state_deployment - -import ( - "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/metricbeat/mb" - "github.com/elastic/beats/metricbeat/module/kubernetes/util" - - dto "github.com/prometheus/client_model/go" -) - -const ( - // Nanocores conversion 10^9 - nanocores = 1000000000 -) - -func eventMapping(families []*dto.MetricFamily) ([]common.MapStr, error) { - eventsMap := map[string]common.MapStr{} - for _, family := range families { - for _, metric := range family.GetMetric() { - deployment := util.GetLabel(metric, "deployment") - if deployment == "" { - continue - } - namespace := util.GetLabel(metric, "namespace") - deploymentKey := namespace + "::" + deployment - event, ok := eventsMap[deploymentKey] - if !ok { - event = common.MapStr{} - eventsMap[deploymentKey] = event - } - - switch family.GetName() { - case "kube_deployment_metadata_generation": - event.Put(mb.ModuleDataKey+".namespace", util.GetLabel(metric, "namespace")) - event.Put(mb.NamespaceKey, "deployment") - event.Put("name", util.GetLabel(metric, "deployment")) - - case "kube_deployment_spec_paused": - event.Put("paused", metric.GetGauge().GetValue() == 1) - - case "kube_deployment_spec_replicas": - event.Put("replicas.desired", metric.GetGauge().GetValue()) - - case "kube_deployment_status_replicas_available": - event.Put("replicas.available", metric.GetGauge().GetValue()) - - case "kube_deployment_status_replicas_unavailable": - event.Put("replicas.unavailable", metric.GetGauge().GetValue()) - - case "kube_deployment_status_replicas_updated": - event.Put("replicas.updated", metric.GetGauge().GetValue()) - - default: - // Ignore unknown metric - continue - } - } - } - - // initialize, populate events array from values in eventsMap - events := make([]common.MapStr, 0, len(eventsMap)) - for _, event := range eventsMap { - events = append(events, event) - } - return events, nil -} diff --git a/metricbeat/module/kubernetes/state_deployment/state_deployment.go b/metricbeat/module/kubernetes/state_deployment/state_deployment.go index f4a51dd9201..7f4bd8d0934 100644 --- a/metricbeat/module/kubernetes/state_deployment/state_deployment.go +++ b/metricbeat/module/kubernetes/state_deployment/state_deployment.go @@ -2,7 +2,8 @@ package state_deployment import ( "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/metricbeat/helper" + + p "github.com/elastic/beats/metricbeat/helper/prometheus" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" ) @@ -17,6 +18,26 @@ var ( DefaultScheme: defaultScheme, DefaultPath: defaultPath, }.Build() + + mapping = &p.MetricsMapping{ + Metrics: map[string]p.MetricMap{ + "kube_deployment_metadata_generation": p.Metric(""), + "kube_deployment_status_replicas_updated": p.Metric("replicas.updated"), + "kube_deployment_status_replicas_unavailable": p.Metric("replicas.unavailable"), + "kube_deployment_status_replicas_available": p.Metric("replicas.available"), + "kube_deployment_spec_replicas": p.Metric("replicas.desired"), + "kube_deployment_spec_paused": p.BooleanMetric("paused"), + }, + + Labels: map[string]p.LabelMap{ + "deployment": p.KeyLabel("name"), + "namespace": p.KeyLabel(mb.ModuleDataKey + ".namespace"), + }, + + ExtraFields: map[string]string{ + mb.NamespaceKey: "deployment", + }, + } ) // init registers the MetricSet with the central registry. @@ -33,14 +54,14 @@ func init() { // multiple fetch calls. type MetricSet struct { mb.BaseMetricSet - prometheus *helper.Prometheus + prometheus p.Prometheus } // New create a new instance of the MetricSet // Part of new is also setting up the configuration by processing additional // configuration entries if needed. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { - prometheus, err := helper.NewPrometheusClient(base) + prometheus, err := p.NewPrometheusClient(base) if err != nil { return nil, err } @@ -54,10 +75,5 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // It returns the event which is then forward to the output. In case of an error, a // descriptive error must be returned. func (m *MetricSet) Fetch() ([]common.MapStr, error) { - families, err := m.prometheus.GetFamilies() - if err != nil { - return nil, err - } - - return eventMapping(families) + return m.prometheus.GetProcessedMetrics(mapping) } diff --git a/metricbeat/module/kubernetes/state_node/data.go b/metricbeat/module/kubernetes/state_node/data.go deleted file mode 100644 index ce336e299a7..00000000000 --- a/metricbeat/module/kubernetes/state_node/data.go +++ /dev/null @@ -1,69 +0,0 @@ -package state_node - -import ( - "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/metricbeat/mb" - "github.com/elastic/beats/metricbeat/module/kubernetes/util" - - dto "github.com/prometheus/client_model/go" -) - -func eventMapping(families []*dto.MetricFamily) ([]common.MapStr, error) { - eventsMap := map[string]common.MapStr{} - for _, family := range families { - for _, metric := range family.GetMetric() { - node := util.GetLabel(metric, "node") - if node == "" { - continue - } - event, ok := eventsMap[node] - if !ok { - event = common.MapStr{} - eventsMap[node] = event - } - switch family.GetName() { - case "kube_node_info": - event.Put(mb.NamespaceKey, "node") - event.Put("name", util.GetLabel(metric, "node")) - - case "kube_node_status_allocatable_cpu_cores": - event.Put("cpu.allocatable.cores", metric.GetGauge().GetValue()) - util.PerfMetrics.NodeCoresAllocatable.Set(util.GetLabel(metric, "node"), metric.GetGauge().GetValue()) - - case "kube_node_status_capacity_cpu_cores": - event.Put("cpu.capacity.cores", metric.GetGauge().GetValue()) - - case "kube_node_status_allocatable_memory_bytes": - event.Put("memory.allocatable.bytes", metric.GetGauge().GetValue()) - util.PerfMetrics.NodeMemAllocatable.Set(util.GetLabel(metric, "node"), metric.GetGauge().GetValue()) - - case "kube_node_status_capacity_memory_bytes": - event.Put("memory.capacity.bytes", metric.GetGauge().GetValue()) - - case "kube_node_status_capacity_pods": - event.Put("pod.capacity.total", metric.GetGauge().GetValue()) - - case "kube_node_status_allocatable_pods": - event.Put("pod.allocatable.total", metric.GetGauge().GetValue()) - - case "kube_node_status_ready": - if metric.GetGauge().GetValue() == 1 { - event.Put("status.ready", util.GetLabel(metric, "condition")) - } - - case "kube_node_spec_unschedulable": - event.Put("status.unschedulable", metric.GetGauge().GetValue() == 1) - - default: - // Ignore unknown metric - continue - } - } - } - - var events []common.MapStr - for _, event := range eventsMap { - events = append(events, event) - } - return events, nil -} diff --git a/metricbeat/module/kubernetes/state_node/state_node.go b/metricbeat/module/kubernetes/state_node/state_node.go index 52f05aa7860..b3c0108aa2c 100644 --- a/metricbeat/module/kubernetes/state_node/state_node.go +++ b/metricbeat/module/kubernetes/state_node/state_node.go @@ -2,7 +2,7 @@ package state_node import ( "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/metricbeat/helper" + p "github.com/elastic/beats/metricbeat/helper/prometheus" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" ) @@ -17,6 +17,28 @@ var ( DefaultScheme: defaultScheme, DefaultPath: defaultPath, }.Build() + + mapping = &p.MetricsMapping{ + Metrics: map[string]p.MetricMap{ + "kube_node_info": p.Metric(""), + "kube_node_status_allocatable_pods": p.Metric("pod.allocatable.total"), + "kube_node_status_capacity_pods": p.Metric("pod.capacity.total"), + "kube_node_status_capacity_memory_bytes": p.Metric("memory.capacity.bytes"), + "kube_node_status_allocatable_memory_bytes": p.Metric("memory.allocatable.bytes"), + "kube_node_status_capacity_cpu_cores": p.Metric("cpu.capacity.cores"), + "kube_node_status_allocatable_cpu_cores": p.Metric("cpu.allocatable.cores"), + "kube_node_spec_unschedulable": p.BooleanMetric("status.unschedulable"), + "kube_node_status_ready": p.LabelMetric("status.ready", "condition"), + }, + + Labels: map[string]p.LabelMap{ + "node": p.KeyLabel("name"), + }, + + ExtraFields: map[string]string{ + mb.NamespaceKey: "node", + }, + } ) // init registers the MetricSet with the central registry. @@ -33,14 +55,14 @@ func init() { // multiple fetch calls. type MetricSet struct { mb.BaseMetricSet - prometheus *helper.Prometheus + prometheus p.Prometheus } // New create a new instance of the MetricSet // Part of new is also setting up the configuration by processing additional // configuration entries if needed. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { - prometheus, err := helper.NewPrometheusClient(base) + prometheus, err := p.NewPrometheusClient(base) if err != nil { return nil, err } @@ -54,10 +76,5 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // It returns the event which is then forward to the output. In case of an error, a // descriptive error must be returned. func (m *MetricSet) Fetch() ([]common.MapStr, error) { - families, err := m.prometheus.GetFamilies() - if err != nil { - return nil, err - } - - return eventMapping(families) + return m.prometheus.GetProcessedMetrics(mapping) } diff --git a/metricbeat/module/kubernetes/state_pod/data.go b/metricbeat/module/kubernetes/state_pod/data.go deleted file mode 100644 index 7a2790c555a..00000000000 --- a/metricbeat/module/kubernetes/state_pod/data.go +++ /dev/null @@ -1,74 +0,0 @@ -package state_pod - -import ( - "strings" - - "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/metricbeat/mb" - "github.com/elastic/beats/metricbeat/module/kubernetes/util" - - dto "github.com/prometheus/client_model/go" -) - -func eventMapping(families []*dto.MetricFamily) ([]common.MapStr, error) { - eventsMap := map[string]common.MapStr{} - for _, family := range families { - for _, metric := range family.GetMetric() { - pod := util.GetLabel(metric, "pod") - if pod == "" { - continue - } - namespace := util.GetLabel(metric, "namespace") - podKey := namespace + "::" + pod - event, ok := eventsMap[podKey] - if !ok { - event = common.MapStr{} - eventsMap[podKey] = event - } - - switch family.GetName() { - case "kube_pod_info": - event.Put(mb.ModuleDataKey+".node.name", util.GetLabel(metric, "node")) - event.Put(mb.ModuleDataKey+".namespace", util.GetLabel(metric, "namespace")) - event.Put(mb.NamespaceKey, "pod") - - event.Put("name", util.GetLabel(metric, "pod")) - - podIP := util.GetLabel(metric, "pod_ip") - hostIP := util.GetLabel(metric, "host_ip") - if podIP != "" { - event.Put("ip", podIP) - } - if hostIP != "" { - event.Put("host_ip", hostIP) - } - - case "kube_pod_status_phase": - if metric.GetGauge().GetValue() == 1 { - event.Put("status.phase", strings.ToLower(util.GetLabel(metric, "phase"))) - } - - case "kube_pod_status_ready": - if metric.GetGauge().GetValue() == 1 { - event.Put("status.ready", util.GetLabel(metric, "condition")) - } - - case "kube_pod_status_scheduled": - if metric.GetGauge().GetValue() == 1 { - event.Put("status.scheduled", util.GetLabel(metric, "condition")) - } - - default: - // Ignore unknown metric - continue - } - } - } - - // initialize, populate events array from values in eventsMap - events := make([]common.MapStr, 0, len(eventsMap)) - for _, event := range eventsMap { - events = append(events, event) - } - return events, nil -} diff --git a/metricbeat/module/kubernetes/state_pod/state_pod.go b/metricbeat/module/kubernetes/state_pod/state_pod.go index 0fbddda4d9a..954966a172f 100644 --- a/metricbeat/module/kubernetes/state_pod/state_pod.go +++ b/metricbeat/module/kubernetes/state_pod/state_pod.go @@ -2,7 +2,7 @@ package state_pod import ( "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/metricbeat/helper" + p "github.com/elastic/beats/metricbeat/helper/prometheus" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" ) @@ -17,6 +17,28 @@ var ( DefaultScheme: defaultScheme, DefaultPath: defaultPath, }.Build() + + mapping = &p.MetricsMapping{ + Metrics: map[string]p.MetricMap{ + "kube_pod_info": p.Metric(""), + "kube_pod_status_phase": p.LabelMetric("status.phase", "phase"), + "kube_pod_status_ready": p.LabelMetric("status.ready", "condition"), + "kube_pod_status_scheduled": p.LabelMetric("status.scheduled", "condition"), + }, + + Labels: map[string]p.LabelMap{ + "pod": p.KeyLabel("name"), + "namespace": p.KeyLabel(mb.ModuleDataKey + ".namespace"), + + "node": p.Label(mb.ModuleDataKey + ".node.name"), + "pod_ip": p.Label("ip"), + "host_ip": p.Label("host_ip"), + }, + + ExtraFields: map[string]string{ + mb.NamespaceKey: "pod", + }, + } ) // init registers the MetricSet with the central registry. @@ -33,14 +55,14 @@ func init() { // multiple fetch calls. type MetricSet struct { mb.BaseMetricSet - prometheus *helper.Prometheus + prometheus p.Prometheus } // New create a new instance of the MetricSet // Part of new is also setting up the configuration by processing additional // configuration entries if needed. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { - prometheus, err := helper.NewPrometheusClient(base) + prometheus, err := p.NewPrometheusClient(base) if err != nil { return nil, err } @@ -54,10 +76,5 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // It returns the event which is then forward to the output. In case of an error, a // descriptive error must be returned. func (m *MetricSet) Fetch() ([]common.MapStr, error) { - families, err := m.prometheus.GetFamilies() - if err != nil { - return nil, err - } - - return eventMapping(families) + return m.prometheus.GetProcessedMetrics(mapping) } diff --git a/metricbeat/module/kubernetes/state_pod/state_pod_test.go b/metricbeat/module/kubernetes/state_pod/state_pod_test.go index 39fa60fcab0..31c2d379eb6 100644 --- a/metricbeat/module/kubernetes/state_pod/state_pod_test.go +++ b/metricbeat/module/kubernetes/state_pod/state_pod_test.go @@ -44,7 +44,7 @@ func TestEventMapping(t *testing.T) { events, err := f.Fetch() assert.NoError(t, err) - assert.Equal(t, 11, len(events), "Wrong number of returned events") + assert.Equal(t, 9, len(events), "Wrong number of returned events") testCases := testCases() for _, event := range events { diff --git a/metricbeat/module/kubernetes/state_replicaset/data.go b/metricbeat/module/kubernetes/state_replicaset/data.go deleted file mode 100644 index b4e82bcf0d0..00000000000 --- a/metricbeat/module/kubernetes/state_replicaset/data.go +++ /dev/null @@ -1,62 +0,0 @@ -package state_replicaset - -import ( - "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/metricbeat/mb" - "github.com/elastic/beats/metricbeat/module/kubernetes/util" - - dto "github.com/prometheus/client_model/go" -) - -func eventMapping(families []*dto.MetricFamily) ([]common.MapStr, error) { - eventsMap := map[string]common.MapStr{} - for _, family := range families { - for _, metric := range family.GetMetric() { - replicaset := util.GetLabel(metric, "replicaset") - if replicaset == "" { - continue - } - namespace := util.GetLabel(metric, "namespace") - replicasetKey := namespace + "::" + replicaset - event, ok := eventsMap[replicasetKey] - if !ok { - event = common.MapStr{} - eventsMap[replicasetKey] = event - } - - switch family.GetName() { - case "kube_replicaset_metadata_generation": - event.Put(mb.ModuleDataKey+".namespace", util.GetLabel(metric, "namespace")) - event.Put(mb.NamespaceKey, "replicaset") - - event.Put("name", util.GetLabel(metric, "replicaset")) - - case "kube_replicaset_status_replicas": - event.Put("replicas.available", metric.GetGauge().GetValue()) - - case "kube_replicaset_spec_replicas": - event.Put("replicas.desired", metric.GetGauge().GetValue()) - - case "kube_replicaset_status_ready_replicas": - event.Put("replicas.ready", metric.GetGauge().GetValue()) - - case "kube_replicaset_status_observed_generation": - event.Put("replicas.observed", metric.GetGauge().GetValue()) - - case "kube_replicaset_status_fully_labeled_replicas": - event.Put("replicas.labeled", metric.GetGauge().GetValue()) - - default: - // Ignore unknown metric - continue - } - } - } - - // initialize, populate events array from values in eventsMap - events := make([]common.MapStr, 0, len(eventsMap)) - for _, event := range eventsMap { - events = append(events, event) - } - return events, nil -} diff --git a/metricbeat/module/kubernetes/state_replicaset/state_replicaset.go b/metricbeat/module/kubernetes/state_replicaset/state_replicaset.go index d9f663b05a9..c3be0e65e38 100644 --- a/metricbeat/module/kubernetes/state_replicaset/state_replicaset.go +++ b/metricbeat/module/kubernetes/state_replicaset/state_replicaset.go @@ -2,7 +2,7 @@ package state_replicaset import ( "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/metricbeat/helper" + p "github.com/elastic/beats/metricbeat/helper/prometheus" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" ) @@ -17,6 +17,26 @@ var ( DefaultScheme: defaultScheme, DefaultPath: defaultPath, }.Build() + + mapping = &p.MetricsMapping{ + Metrics: map[string]p.MetricMap{ + "kube_replicaset_metadata_generation": p.Metric(""), + "kube_replicaset_status_fully_labeled_replicas": p.Metric("replicas.labeled"), + "kube_replicaset_status_observed_generation": p.Metric("replicas.observed"), + "kube_replicaset_status_ready_replicas": p.Metric("replicas.ready"), + "kube_replicaset_spec_replicas": p.Metric("replicas.desired"), + "kube_replicaset_status_replicas": p.Metric("replicas.available"), + }, + + Labels: map[string]p.LabelMap{ + "replicaset": p.KeyLabel("name"), + "namespace": p.KeyLabel(mb.ModuleDataKey + ".namespace"), + }, + + ExtraFields: map[string]string{ + mb.NamespaceKey: "replicaset", + }, + } ) // init registers the MetricSet with the central registry. @@ -33,14 +53,14 @@ func init() { // multiple fetch calls. type MetricSet struct { mb.BaseMetricSet - prometheus *helper.Prometheus + prometheus p.Prometheus } // New create a new instance of the MetricSet // Part of new is also setting up the configuration by processing additional // configuration entries if needed. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { - prometheus, err := helper.NewPrometheusClient(base) + prometheus, err := p.NewPrometheusClient(base) if err != nil { return nil, err } @@ -54,10 +74,5 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // It returns the event which is then forward to the output. In case of an error, a // descriptive error must be returned. func (m *MetricSet) Fetch() ([]common.MapStr, error) { - families, err := m.prometheus.GetFamilies() - if err != nil { - return nil, err - } - - return eventMapping(families) + return m.prometheus.GetProcessedMetrics(mapping) } diff --git a/metricbeat/module/kubernetes/state_statefulset/data.go b/metricbeat/module/kubernetes/state_statefulset/data.go deleted file mode 100644 index aa0d38331fd..00000000000 --- a/metricbeat/module/kubernetes/state_statefulset/data.go +++ /dev/null @@ -1,53 +0,0 @@ -package state_statefulset - -import ( - "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/metricbeat/mb" - "github.com/elastic/beats/metricbeat/module/kubernetes/util" - - dto "github.com/prometheus/client_model/go" -) - -func eventMapping(families []*dto.MetricFamily) ([]common.MapStr, error) { - eventsMap := map[string]common.MapStr{} - for _, family := range families { - for _, metric := range family.GetMetric() { - statefulset := util.GetLabel(metric, "statefulset") - if statefulset == "" { - continue - } - namespace := util.GetLabel(metric, "namespace") - statefulsetKey := namespace + "::" + statefulset - event, ok := eventsMap[statefulsetKey] - if !ok { - event = common.MapStr{} - eventsMap[statefulsetKey] = event - } - switch family.GetName() { - case "kube_statefulset_metadata_generation": - event.Put(mb.ModuleDataKey+".namespace", util.GetLabel(metric, "namespace")) - event.Put(mb.NamespaceKey, "statefulset") - event.Put("name", util.GetLabel(metric, "statefulset")) - event.Put("generation.desired", metric.GetGauge().GetValue()) - case "kube_statefulset_status_observed_generation": - event.Put("generation.observed", metric.GetGauge().GetValue()) - case "kube_statefulset_created": - event.Put("created", metric.GetGauge().GetValue()) - case "kube_statefulset_replicas": - event.Put("replicas.desired", metric.GetGauge().GetValue()) - case "kube_statefulset_status_replicas": - event.Put("replicas.observed", metric.GetGauge().GetValue()) - default: - // Ignore unknown metric - continue - } - } - } - - // initialize, populate events array from values in eventsMap - events := make([]common.MapStr, 0, len(eventsMap)) - for _, event := range eventsMap { - events = append(events, event) - } - return events, nil -} diff --git a/metricbeat/module/kubernetes/state_statefulset/state_statefulset.go b/metricbeat/module/kubernetes/state_statefulset/state_statefulset.go index bfeba9f15de..dadd3bbd146 100644 --- a/metricbeat/module/kubernetes/state_statefulset/state_statefulset.go +++ b/metricbeat/module/kubernetes/state_statefulset/state_statefulset.go @@ -2,7 +2,7 @@ package state_statefulset import ( "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/metricbeat/helper" + p "github.com/elastic/beats/metricbeat/helper/prometheus" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" ) @@ -17,6 +17,25 @@ var ( DefaultScheme: defaultScheme, DefaultPath: defaultPath, }.Build() + + mapping = &p.MetricsMapping{ + Metrics: map[string]p.MetricMap{ + "kube_statefulset_created": p.Metric("created"), + "kube_statefulset_metadata_generation": p.Metric("generation.desired"), + "kube_statefulset_status_observed_generation": p.Metric("generation.observed"), + "kube_statefulset_replicas": p.Metric("replicas.desired"), + "kube_statefulset_status_replicas": p.Metric("replicas.observed"), + }, + + Labels: map[string]p.LabelMap{ + "statefulset": p.KeyLabel("name"), + "namespace": p.KeyLabel(mb.ModuleDataKey + ".namespace"), + }, + + ExtraFields: map[string]string{ + mb.NamespaceKey: "statefulset", + }, + } ) // init registers the MetricSet with the central registry. @@ -33,14 +52,14 @@ func init() { // multiple fetch calls. type MetricSet struct { mb.BaseMetricSet - prometheus *helper.Prometheus + prometheus p.Prometheus } // New create a new instance of the MetricSet // Part of new is also setting up the configuration by processing additional // configuration entries if needed. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { - prometheus, err := helper.NewPrometheusClient(base) + prometheus, err := p.NewPrometheusClient(base) if err != nil { return nil, err } @@ -54,10 +73,5 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // It returns the event which is then forward to the output. In case of an error, a // descriptive error must be returned. func (m *MetricSet) Fetch() ([]common.MapStr, error) { - families, err := m.prometheus.GetFamilies() - if err != nil { - return nil, err - } - - return eventMapping(families) + return m.prometheus.GetProcessedMetrics(mapping) } diff --git a/metricbeat/module/prometheus/collector/collector.go b/metricbeat/module/prometheus/collector/collector.go index 5cb3b026c49..75706a8229f 100644 --- a/metricbeat/module/prometheus/collector/collector.go +++ b/metricbeat/module/prometheus/collector/collector.go @@ -5,7 +5,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" - "github.com/elastic/beats/metricbeat/helper" + p "github.com/elastic/beats/metricbeat/helper/prometheus" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" ) @@ -32,7 +32,7 @@ func init() { type MetricSet struct { mb.BaseMetricSet - prometheus *helper.Prometheus + prometheus p.Prometheus namespace string } @@ -47,7 +47,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return nil, err } - prometheus, err := helper.NewPrometheusClient(base) + prometheus, err := p.NewPrometheusClient(base) if err != nil { return nil, err }