Skip to content

Commit

Permalink
Don't include empty values in Metricbeat
Browse files Browse the repository at this point in the history
This is an alternative implementation of elastic#2032. Instead of collecting the errors
and then removing the empty values, we define a schema of conversions and we use code
apply them, so we have the opportunity to handle errors.

Fixes elastic#1972.
  • Loading branch information
Tudor Golubenco committed Jul 14, 2016
1 parent 6177714 commit 4787c0e
Show file tree
Hide file tree
Showing 8 changed files with 298 additions and 196 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha4...master[Check the HEAD d

*Metricbeat*

- Do not send zero values when no value was present in the source. {issue}1972[1972]

*Packetbeat*

*Topbeat*
Expand Down
139 changes: 111 additions & 28 deletions metricbeat/helper/conversion.go
Original file line number Diff line number Diff line change
@@ -1,85 +1,168 @@
/*
The conversion functions take a key and a map[string]string. First it checks if the key exists and logs and error
if this is not the case. Second the conversion to the type is done. In case of an error and error is logged and the
default values is returned. This guarantees that also if a field is missing or is not defined, still the full metricset
is returned.
*/
package helper

import (
"fmt"
"strconv"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)

// Schema describes how a map[string]string object can be parsed and converted into
// an event. The conversions can be described using an (optionally nested) common.MapStr
// that contains Conv objects.
type Schema struct {
conversions common.MapStr
}

// A Conv object represents a conversion mechanism from the data map to the event map.
type Conv struct {
Func Convertor // Convertor function
Key string // The key in the data map
Optional bool // Whether to log errors if the key is not found
}

// Convertor function type
type Convertor func(key string, data map[string]string) (interface{}, error)

// NewSchema creates a new converting schema.
func NewSchema(conversions common.MapStr) Schema {
return Schema{conversions}
}

// ApplyTo adds the fields extracted from data, converted using the schema, to the
// event map.
func (s Schema) ApplyTo(event common.MapStr, data map[string]string) common.MapStr {
applySchemaToEvent(event, data, s.conversions)
return event
}

// Apply converts the fields extracted from data, using the schema, into a new map.
func (s Schema) Apply(data map[string]string) common.MapStr {
return s.ApplyTo(common.MapStr{}, data)
}

func applySchemaToEvent(event common.MapStr, data map[string]string, conversions common.MapStr) {
for key, conversion := range conversions {
switch conversion.(type) {
case Conv:
conv := conversion.(Conv)
value, err := conv.Func(conv.Key, data)
if err != nil {
if !conv.Optional {
logp.Err("Error on field '%s': %v", key, err)
}
} else {
event[key] = value
}
case common.MapStr:
subEvent := common.MapStr{}
applySchemaToEvent(subEvent, data, conversion.(common.MapStr))
event[key] = subEvent
}
}
}

// ToBool converts value to bool. In case of error, returns false
func ToBool(key string, data map[string]string) bool {
func ToBool(key string, data map[string]string) (interface{}, error) {

exists := checkExist(key, data)
if !exists {
logp.Err("Key does not exist in in data: %s", key)
return false
return false, fmt.Errorf("Key `%s` not found", key)
}

value, err := strconv.ParseBool(data[key])
if err != nil {
logp.Err("Error converting param to bool: %s", key)
return false
return false, fmt.Errorf("Error converting param to bool: %s", key)
}

return value
return value, nil
}

// Bool creates a Conv object for parsing booleans
func Bool(key string, opts ...SchemaOption) Conv {
return setOptions(Conv{Key: key, Func: ToBool}, opts)
}

// ToFloat converts value to float64. In case of error, returns 0.0
func ToFloat(key string, data map[string]string) float64 {
func ToFloat(key string, data map[string]string) (interface{}, error) {

exists := checkExist(key, data)
if !exists {
logp.Err("Key does not exist in in data: %s", key)
return 0.0
return false, fmt.Errorf("Key `%s` not found", key)
}

value, err := strconv.ParseFloat(data[key], 64)
if err != nil {
logp.Err("Error converting param to float: %s", key)
value = 0.0
return 0.0, fmt.Errorf("Error converting param to float: %s", key)
}

return value
return value, nil
}

// Float creates a Conv object for parsing floats
func Float(key string, opts ...SchemaOption) Conv {
return setOptions(Conv{Key: key, Func: ToFloat}, opts)
}

// ToInt converts value to int. In case of error, returns 0
func ToInt(key string, data map[string]string) int64 {
func ToInt(key string, data map[string]string) (interface{}, error) {

exists := checkExist(key, data)
if !exists {
logp.Err("Key does not exist in in data: %s", key)
return 0
return false, fmt.Errorf("Key `%s` not found", key)
}

value, err := strconv.ParseInt(data[key], 10, 64)
if err != nil {
logp.Err("Error converting param to int: %s", key)
return 0
return 0, fmt.Errorf("Error converting param to int: %s", key)
}

return value
return value, nil
}

// Int creates a Conv object for parsing integers
func Int(key string, opts ...SchemaOption) Conv {
return setOptions(Conv{Key: key, Func: ToInt}, opts)
}

// ToStr converts value to str. In case of error, returns ""
func ToStr(key string, data map[string]string) string {
func ToStr(key string, data map[string]string) (interface{}, error) {

exists := checkExist(key, data)
if !exists {
logp.Err("Key does not exist in in data: %s", key)
return ""
return false, fmt.Errorf("Key `%s` not found", key)
}

return data[key]
return data[key], nil
}

// Str creates a Conv object for parsing strings
func Str(key string, opts ...SchemaOption) Conv {
return setOptions(Conv{Key: key, Func: ToStr}, opts)
}

// checkExists checks if a key exists in the given data set
func checkExist(key string, data map[string]string) bool {
_, ok := data[key]
return ok
}

// SchemaOption is for adding optional parameters to the conversion
// functions
type SchemaOption func(c Conv) Conv

// The optional flag suppresses the error message in case the key
// doesn't exist or results in an error.
func Optional(c Conv) Conv {
c.Optional = true
return c
}

// setOptions adds the optional flags to the Conv object
func setOptions(c Conv, opts []SchemaOption) Conv {
for _, opt := range opts {
c = opt(c)
}
return c
}
72 changes: 38 additions & 34 deletions metricbeat/module/apache/status/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,42 @@ var (

// This should match: "CPUSystem: .01"
matchNumber = regexp.MustCompile("(^[0-9a-zA-Z ]+):\\s+(\\d*\\.?\\d+)")

schema = h.NewSchema(common.MapStr{
"total_accesses": h.Int("Total Accesses"),
"total_kbytes": h.Int("Total kBytes"),
"requests_per_sec": h.Float("ReqPerSec", h.Optional),
"bytes_per_sec": h.Float("BytesPerSec", h.Optional),
"bytes_per_request": h.Float("BytesPerReq", h.Optional),
"workers": common.MapStr{
"busy": h.Int("BusyWorkers"),
"idle": h.Int("IdleWorkers"),
},
"uptime": common.MapStr{
"server_uptime": h.Int("ServerUptimeSeconds"),
"uptime": h.Int("Uptime"),
},
"cpu": common.MapStr{
"load": h.Float("CPULoad", h.Optional),
"user": h.Float("CPUUser"),
"system": h.Float("CPUSystem"),
"children_user": h.Float("CPUChildrenUser"),
"children_system": h.Float("CPUChildrenSystem"),
},
"connections": common.MapStr{
"total": h.Int("ConnsTotal"),
"async": common.MapStr{
"writing": h.Int("ConnsAsyncWriting"),
"keep_alive": h.Int("ConnsAsyncKeepAlive"),
"closing": h.Int("ConnsAsyncClosing"),
},
},
"load": common.MapStr{
"1": h.Float("Load1"),
"5": h.Float("Load5"),
"15": h.Float("Load15"),
},
})
)

// Map body to MapStr
Expand Down Expand Up @@ -89,40 +125,7 @@ func eventMapping(body io.ReadCloser, hostname string) common.MapStr {
}

event := common.MapStr{
"hostname": hostname,
"total_accesses": h.ToInt("Total Accesses", fullEvent),
"total_kbytes": h.ToInt("Total kBytes", fullEvent),
"requests_per_sec": h.ToFloat("ReqPerSec", fullEvent),
"bytes_per_sec": h.ToFloat("BytesPerSec", fullEvent),
"bytes_per_request": h.ToFloat("BytesPerReq", fullEvent),
"workers": common.MapStr{
"busy": h.ToInt("BusyWorkers", fullEvent),
"idle": h.ToInt("IdleWorkers", fullEvent),
},
"uptime": common.MapStr{
"server_uptime": h.ToInt("ServerUptimeSeconds", fullEvent),
"uptime": h.ToInt("Uptime", fullEvent),
},
"cpu": common.MapStr{
"load": h.ToFloat("CPULoad", fullEvent),
"user": h.ToFloat("CPUUser", fullEvent),
"system": h.ToFloat("CPUSystem", fullEvent),
"children_user": h.ToFloat("CPUChildrenUser", fullEvent),
"children_system": h.ToFloat("CPUChildrenSystem", fullEvent),
},
"connections": common.MapStr{
"total": h.ToInt("ConnsTotal", fullEvent),
"async": common.MapStr{
"writing": h.ToInt("ConnsAsyncWriting", fullEvent),
"keep_alive": h.ToInt("ConnsAsyncKeepAlive", fullEvent),
"closing": h.ToInt("ConnsAsyncClosing", fullEvent),
},
},
"load": common.MapStr{
"1": h.ToFloat("Load1", fullEvent),
"5": h.ToFloat("Load5", fullEvent),
"15": h.ToFloat("Load15", fullEvent),
},
"hostname": hostname,
"scoreboard": common.MapStr{
"starting_up": totalS,
"reading_request": totalR,
Expand All @@ -138,6 +141,7 @@ func eventMapping(body io.ReadCloser, hostname string) common.MapStr {
"total": totalAll,
},
}
schema.ApplyTo(event, fullEvent)

return event
}
Expand Down
4 changes: 3 additions & 1 deletion metricbeat/module/apache/status/status_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ func TestFetch(t *testing.T) {
t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event)

// Check number of fields.
assert.Equal(t, 12, len(event))
if len(event) < 11 {
t.Fatal("Too few top-level elements in the event")
}
}

func TestData(t *testing.T) {
Expand Down
53 changes: 27 additions & 26 deletions metricbeat/module/mysql/status/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,47 +5,48 @@ import (
h "github.com/elastic/beats/metricbeat/helper"
)

// Map data to MapStr of server stats variables: http://dev.mysql.com/doc/refman/5.7/en/server-status-variables.html
// This is only a subset of the available values
func eventMapping(status map[string]string) common.MapStr {

event := common.MapStr{
var (
schema = h.NewSchema(common.MapStr{
"aborted": common.MapStr{
"clients": h.ToInt("Aborted_clients", status),
"connects": h.ToInt("Aborted_connects", status),
"clients": h.Int("Aborted_clients"),
"connects": h.Int("Aborted_connects"),
},
"binlog": common.MapStr{
"cache": common.MapStr{
"disk_use": h.ToInt("Binlog_cache_disk_use", status),
"use": h.ToInt("Binlog_cache_use", status),
"disk_use": h.Int("Binlog_cache_disk_use"),
"use": h.Int("Binlog_cache_use"),
},
},
"bytes": common.MapStr{
"received": h.ToInt("Bytes_received", status),
"sent": h.ToInt("Bytes_sent", status),
"received": h.Int("Bytes_received"),
"sent": h.Int("Bytes_sent"),
},
"connections": h.ToInt("Connections", status),
"connections": h.Int("Connections"),
"created": common.MapStr{
"tmp": common.MapStr{
"disk_tables": h.ToInt("Created_tmp_disk_tables", status),
"files": h.ToInt("Created_tmp_files", status),
"tables": h.ToInt("Created_tmp_tables", status),
"disk_tables": h.Int("Created_tmp_disk_tables"),
"files": h.Int("Created_tmp_files"),
"tables": h.Int("Created_tmp_tables"),
},
},
"delayed": common.MapStr{
"errors": h.ToInt("Delayed_errors", status),
"insert_threads": h.ToInt("Delayed_insert_threads", status),
"writes": h.ToInt("Delayed_writes", status),
"errors": h.Int("Delayed_errors"),
"insert_threads": h.Int("Delayed_insert_threads"),
"writes": h.Int("Delayed_writes"),
},
"flush_commands": h.ToInt("Flush_commands", status),
"max_used_connections": h.ToInt("Max_used_connections", status),
"flush_commands": h.Int("Flush_commands"),
"max_used_connections": h.Int("Max_used_connections"),
"open": common.MapStr{
"files": h.ToInt("Open_files", status),
"streams": h.ToInt("Open_streams", status),
"tables": h.ToInt("Open_tables", status),
"files": h.Int("Open_files"),
"streams": h.Int("Open_streams"),
"tables": h.Int("Open_tables"),
},
"opened_tables": h.ToInt("Opened_tables", status),
}
"opened_tables": h.Int("Opened_tables"),
})
)

return event
// Map data to MapStr of server stats variables: http://dev.mysql.com/doc/refman/5.7/en/server-status-variables.html
// This is only a subset of the available values
func eventMapping(status map[string]string) common.MapStr {
return schema.Apply(status)
}
Loading

0 comments on commit 4787c0e

Please sign in to comment.