Skip to content

Commit

Permalink
Do not send empty values in case of errors
Browse files Browse the repository at this point in the history
It used to be that we were sending zero values (e.g. "" for string, 0 for int)
in case of errors getting a particular value.

Fixes elastic#1972.
  • Loading branch information
Tudor Golubenco committed Jul 14, 2016
1 parent 787f45b commit 2197e4d
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 150 deletions.
63 changes: 44 additions & 19 deletions metricbeat/helper/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,79 +7,104 @@ is returned.
package helper

import (
"fmt"
"strconv"
"strings"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)

// ToBool converts value to bool. In case of error, returns false
func ToBool(key string, data map[string]string) bool {
func ToBool(key string, data map[string]string, errs map[string]error, path string) bool {

exists := checkExist(key, data)
if !exists {
logp.Err("Key does not exist in in data: %s", key)
if !checkExist(key, data) {
errs[path] = fmt.Errorf("Key %s not found", key)
return false
}

value, err := strconv.ParseBool(data[key])
if err != nil {
logp.Err("Error converting param to bool: %s", key)
errs[path] = fmt.Errorf("Error converting value to bool: '%s'", data[key])
return false
}

return value
}

// ToFloat converts value to float64. In case of error, returns 0.0
func ToFloat(key string, data map[string]string) float64 {
func ToFloat(key string, data map[string]string, errs map[string]error, path string) float64 {

exists := checkExist(key, data)
if !exists {
logp.Err("Key does not exist in in data: %s", key)
if !checkExist(key, data) {
errs[path] = fmt.Errorf("Key %s not found", key)
return 0.0
}

value, err := strconv.ParseFloat(data[key], 64)
if err != nil {
logp.Err("Error converting param to float: %s", key)
errs[path] = fmt.Errorf("Error converting value to float: '%s'", data[key])
value = 0.0
}

return value
}

// ToInt converts value to int. In case of error, returns 0
func ToInt(key string, data map[string]string) int64 {
func ToInt(key string, data map[string]string, errs map[string]error, path string) int64 {

exists := checkExist(key, data)
if !exists {
logp.Err("Key does not exist in in data: %s", key)
if !checkExist(key, data) {
errs[path] = fmt.Errorf("Key %s not found", key)
return 0
}

value, err := strconv.ParseInt(data[key], 10, 64)
if err != nil {
logp.Err("Error converting param to int: %s", key)
errs[path] = fmt.Errorf("Error converting value to int: '%s'", data[key])
return 0
}

return value
}

// ToStr converts value to str. In case of error, returns ""
func ToStr(key string, data map[string]string) string {
func ToStr(key string, data map[string]string, errs map[string]error, path string) string {

exists := checkExist(key, data)
if !exists {
logp.Err("Key does not exist in in data: %s", key)
if !checkExist(key, data) {
errs[path] = fmt.Errorf("Key %s not found", key)
return ""
}

return data[key]
}

func RemoveErroredKeys(event common.MapStr, errs map[string]error) {
for key, err := range errs {
logp.Err("Error on field `%s`: %v", key, err)
if err_ := deleteKey(event, key); err_ != nil {
logp.Err("Error when trying to remove errored key %s: %v", key, err_)
}
}
}

// checkExists checks if a key exists in the given data set
func checkExist(key string, data map[string]string) bool {
_, ok := data[key]
return ok
}

func deleteKey(event common.MapStr, key string) error {
path := strings.Split(key, ".")
ev := event
for i, pathEl := range path {
if i == len(path)-1 {
delete(ev, pathEl)
} else {
var ok bool
ev, ok = ev[pathEl].(common.MapStr)
if !ok {
return fmt.Errorf("Error accessing field %s", pathEl)
}
}
}
return nil
}
44 changes: 23 additions & 21 deletions metricbeat/module/apache/status/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,40 +88,41 @@ func eventMapping(body io.ReadCloser, hostname string) common.MapStr {
}
}

errs := map[string]error{}
event := common.MapStr{
"hostname": hostname,
"total_accesses": h.ToInt("Total Accesses", fullEvent),
"total_kbytes": h.ToInt("Total kBytes", fullEvent),
"requests_per_sec": h.ToFloat("ReqPerSec", fullEvent),
"bytes_per_sec": h.ToFloat("BytesPerSec", fullEvent),
"bytes_per_request": h.ToFloat("BytesPerReq", fullEvent),
"total_accesses": h.ToInt("Total Accesses", fullEvent, errs, "total_accesses"),
"total_kbytes": h.ToInt("Total kBytes", fullEvent, errs, "total_kbytes"),
"requests_per_sec": h.ToFloat("ReqPerSec", fullEvent, errs, "requests_per_sec"),
"bytes_per_sec": h.ToFloat("BytesPerSec", fullEvent, errs, "bytes_per_sec"),
"bytes_per_request": h.ToFloat("BytesPerReq", fullEvent, errs, "bytes_per_request"),
"workers": common.MapStr{
"busy": h.ToInt("BusyWorkers", fullEvent),
"idle": h.ToInt("IdleWorkers", fullEvent),
"busy": h.ToInt("BusyWorkers", fullEvent, errs, "workers.busy"),
"idle": h.ToInt("IdleWorkers", fullEvent, errs, "workers.idle"),
},
"uptime": common.MapStr{
"server_uptime": h.ToInt("ServerUptimeSeconds", fullEvent),
"uptime": h.ToInt("Uptime", fullEvent),
"server_uptime": h.ToInt("ServerUptimeSeconds", fullEvent, errs, "uptime.server_uptime"),
"uptime": h.ToInt("Uptime", fullEvent, errs, "uptime.uptime"),
},
"cpu": common.MapStr{
"load": h.ToFloat("CPULoad", fullEvent),
"user": h.ToFloat("CPUUser", fullEvent),
"system": h.ToFloat("CPUSystem", fullEvent),
"children_user": h.ToFloat("CPUChildrenUser", fullEvent),
"children_system": h.ToFloat("CPUChildrenSystem", fullEvent),
"load": h.ToFloat("CPULoad", fullEvent, errs, "cpu.load"),
"user": h.ToFloat("CPUUser", fullEvent, errs, "cpu.user"),
"system": h.ToFloat("CPUSystem", fullEvent, errs, "cpu.system"),
"children_user": h.ToFloat("CPUChildrenUser", fullEvent, errs, "cpu.children_user"),
"children_system": h.ToFloat("CPUChildrenSystem", fullEvent, errs, "cpu.children_system"),
},
"connections": common.MapStr{
"total": h.ToInt("ConnsTotal", fullEvent),
"total": h.ToInt("ConnsTotal", fullEvent, errs, "connections.total"),
"async": common.MapStr{
"writing": h.ToInt("ConnsAsyncWriting", fullEvent),
"keep_alive": h.ToInt("ConnsAsyncKeepAlive", fullEvent),
"closing": h.ToInt("ConnsAsyncClosing", fullEvent),
"writing": h.ToInt("ConnsAsyncWriting", fullEvent, errs, "connections.async.writing"),
"keep_alive": h.ToInt("ConnsAsyncKeepAlive", fullEvent, errs, "connections.async.keep_alive"),
"closing": h.ToInt("ConnsAsyncClosing", fullEvent, errs, "connections.async.closing"),
},
},
"load": common.MapStr{
"1": h.ToFloat("Load1", fullEvent),
"5": h.ToFloat("Load5", fullEvent),
"15": h.ToFloat("Load15", fullEvent),
"1": h.ToFloat("Load1", fullEvent, errs, "load.1"),
"5": h.ToFloat("Load5", fullEvent, errs, "load.5"),
"15": h.ToFloat("Load15", fullEvent, errs, "load.15"),
},
"scoreboard": common.MapStr{
"starting_up": totalS,
Expand All @@ -138,6 +139,7 @@ func eventMapping(body io.ReadCloser, hostname string) common.MapStr {
"total": totalAll,
},
}
h.RemoveErroredKeys(event, errs)

return event
}
Expand Down
40 changes: 21 additions & 19 deletions metricbeat/module/mysql/status/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,43 +9,45 @@ import (
// This is only a subset of the available values
func eventMapping(status map[string]string) common.MapStr {

errs := map[string]error{}
event := common.MapStr{
"aborted": common.MapStr{
"clients": h.ToInt("Aborted_clients", status),
"connects": h.ToInt("Aborted_connects", status),
"clients": h.ToInt("Aborted_clients", status, errs, "aborted.clients"),
"connects": h.ToInt("Aborted_connects", status, errs, "aborted.connects"),
},
"binlog": common.MapStr{
"cache": common.MapStr{
"disk_use": h.ToInt("Binlog_cache_disk_use", status),
"use": h.ToInt("Binlog_cache_use", status),
"disk_use": h.ToInt("Binlog_cache_disk_use", status, errs, "binlog.cache.disk_use"),
"use": h.ToInt("Binlog_cache_use", status, errs, "binlog.cache.use"),
},
},
"bytes": common.MapStr{
"received": h.ToInt("Bytes_received", status),
"sent": h.ToInt("Bytes_sent", status),
"received": h.ToInt("Bytes_received", status, errs, "bytes.received"),
"sent": h.ToInt("Bytes_sent", status, errs, "bytes.sent"),
},
"connections": h.ToInt("Connections", status),
"connections": h.ToInt("Connections", status, errs, "connections"),
"created": common.MapStr{
"tmp": common.MapStr{
"disk_tables": h.ToInt("Created_tmp_disk_tables", status),
"files": h.ToInt("Created_tmp_files", status),
"tables": h.ToInt("Created_tmp_tables", status),
"disk_tables": h.ToInt("Created_tmp_disk_tables", status, errs, "created.tmp.disk_tables"),
"files": h.ToInt("Created_tmp_files", status, errs, "created.tmp.files"),
"tables": h.ToInt("Created_tmp_tables", status, errs, "created.tmp.tables"),
},
},
"delayed": common.MapStr{
"errors": h.ToInt("Delayed_errors", status),
"insert_threads": h.ToInt("Delayed_insert_threads", status),
"writes": h.ToInt("Delayed_writes", status),
"errors": h.ToInt("Delayed_errors", status, errs, "delayed.errors"),
"insert_threads": h.ToInt("Delayed_insert_threads", status, errs, "delayed.insert_threads"),
"writes": h.ToInt("Delayed_writes", status, errs, "delayed.writes"),
},
"flush_commands": h.ToInt("Flush_commands", status),
"max_used_connections": h.ToInt("Max_used_connections", status),
"flush_commands": h.ToInt("Flush_commands", status, errs, "flush_commands"),
"max_used_connections": h.ToInt("Max_used_connections", status, errs, "max_used_connections"),
"open": common.MapStr{
"files": h.ToInt("Open_files", status),
"streams": h.ToInt("Open_streams", status),
"tables": h.ToInt("Open_tables", status),
"files": h.ToInt("Open_files", status, errs, "open.files"),
"streams": h.ToInt("Open_streams", status, errs, "open.streams"),
"tables": h.ToInt("Open_tables", status, errs, "open.tables"),
},
"opened_tables": h.ToInt("Opened_tables", status),
"opened_tables": h.ToInt("Opened_tables", status, errs, "opened_tables"),
}
h.RemoveErroredKeys(event, errs)

return event
}
Loading

0 comments on commit 2197e4d

Please sign in to comment.