Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log structured monitoring metrics #5915

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
- Moved `ip_port` indexer for `add_kubernetes_metadata` to all beats. {pull}5707[5707]
- `ip_port` indexer now index both IP and IP:port pairs. {pull}5721[5721]
- Add the ability to write structured logs. {pull}5901[5901]
- Use structured logging for the metrics that are periodically logged via the
`logging.metrics` feature. {pull}5915[5915]

*Auditbeat*

Expand Down
47 changes: 14 additions & 33 deletions libbeat/monitoring/report/log/log.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package log

import (
"bytes"
"fmt"
"sort"
"sync"
"time"

Expand All @@ -30,25 +27,22 @@ var gauges = map[string]bool{
"beat.memstats.memory_total": true,
"beat.memstats.memory_alloc": true,
"beat.memstats.gc_next": true,
"beat.info.uptime.ms": true,
}

var (
// startTime is the time that the process was started.
startTime = time.Now()
)

type logger interface {
Infof(format string, v ...interface{})
}

type reporter struct {
wg sync.WaitGroup
done chan struct{}
period time.Duration
registry *monitoring.Registry

// output
logger logger
logger *logp.Logger
}

// MakeReporter returns a new Reporter that periodically reports metrics via
Expand All @@ -64,7 +58,7 @@ func MakeReporter(beat beat.Info, cfg *common.Config) (report.Reporter, error) {
r := &reporter{
done: make(chan struct{}),
period: config.Period,
logger: logp.NewLogger("metrics"),
logger: logp.NewLogger("monitoring"),
registry: monitoring.Default,
}

Expand All @@ -84,7 +78,9 @@ func (r *reporter) Stop() {
func (r *reporter) snapshotLoop() {
r.logger.Infof("Starting metrics logging every %v", r.period)
defer r.logger.Infof("Stopping metrics logging.")
defer r.logTotals(makeSnapshot(r.registry))
defer func() {
r.logTotals(makeDeltaSnapshot(monitoring.MakeFlatSnapshot(), makeSnapshot(r.registry)))
}()

ticker := time.NewTicker(r.period)
defer ticker.Stop()
Expand All @@ -107,15 +103,15 @@ func (r *reporter) snapshotLoop() {

func (r *reporter) logSnapshot(s monitoring.FlatSnapshot) {
if snapshotLen(s) > 0 {
r.logger.Infof("Non-zero metrics in the last %v: %v", r.period, toKeyValuePairs(s))
r.logger.Infow("Non-zero metrics in the last "+r.period.String(), toKeyValuePairs(s)...)
return
}

r.logger.Infof("No non-zero metrics in the last %v", r.period)
}

func (r *reporter) logTotals(s monitoring.FlatSnapshot) {
r.logger.Infof("Total non-zero metrics: %v", toKeyValuePairs(s))
r.logger.Infow("Total non-zero metrics", toKeyValuePairs(s)...)
r.logger.Infof("Uptime: %v", time.Since(startTime))
}

Expand Down Expand Up @@ -162,35 +158,20 @@ func snapshotLen(s monitoring.FlatSnapshot) int {
return len(s.Bools) + len(s.Floats) + len(s.Ints) + len(s.Strings)
}

func toKeyValuePairs(s monitoring.FlatSnapshot) string {
func toKeyValuePairs(s monitoring.FlatSnapshot) []interface{} {
data := make(common.MapStr, snapshotLen(s))
for k, v := range s.Bools {
data[k] = v
data.Put(k, v)
}
for k, v := range s.Floats {
data[k] = v
data.Put(k, v)
}
for k, v := range s.Ints {
data[k] = v
data.Put(k, v)
}
for k, v := range s.Strings {
data[k] = v
data.Put(k, v)
}

keys := make([]string, 0, len(data))
for k := range data {
keys = append(keys, k)
}
sort.Strings(keys)

var buf bytes.Buffer
for _, key := range keys {
if buf.Len() != 0 {
buf.WriteByte(' ')
}
buf.WriteString(key)
buf.WriteString("=")
buf.WriteString(fmt.Sprintf("%v", data[key]))
}
return buf.String()
return []interface{}{logp.Namespace("monitoring"), logp.Reflect("metrics", data)}
}
43 changes: 28 additions & 15 deletions libbeat/monitoring/report/log/log_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package log

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
)

Expand All @@ -27,14 +27,6 @@ var (
}
)

type fakeLogger struct {
logs []string
}

func (l *fakeLogger) Infof(format string, v ...interface{}) {
l.logs = append(l.logs, fmt.Sprintf(format, v...))
}

// Smoke test.
func TestStartStop(t *testing.T) {
r, err := MakeReporter(beat.Info{}, common.NewConfig())
Expand All @@ -52,11 +44,14 @@ func TestMakeDeltaSnapshot(t *testing.T) {
}

func TestReporterLog(t *testing.T) {
logger := &fakeLogger{}
reporter := reporter{period: 30 * time.Second, logger: logger}
logp.DevelopmentSetup(logp.ToObserverOutput())
reporter := reporter{period: 30 * time.Second, logger: logp.NewLogger("monitoring")}

reporter.logSnapshot(monitoring.FlatSnapshot{})
assert.Equal(t, "No non-zero metrics in the last 30s", logger.logs[0])
logs := logp.ObserverLogs().TakeAll()
if assert.Len(t, logs, 1) {
assert.Equal(t, "No non-zero metrics in the last 30s", logs[0].Message)
}

reporter.logSnapshot(
monitoring.FlatSnapshot{
Expand All @@ -65,9 +60,27 @@ func TestReporterLog(t *testing.T) {
},
},
)
assert.Equal(t, "Non-zero metrics in the last 30s: running=true", logger.logs[1])
logs = logp.ObserverLogs().TakeAll()
if assert.Len(t, logs, 1) {
assert.Equal(t, "Non-zero metrics in the last 30s", logs[0].Message)
assertMapHas(t, logs[0].ContextMap(), "monitoring.metrics.running", true)
}

reporter.logTotals(curSnap)
assert.Equal(t, "Total non-zero metrics: count=20 new=1", logger.logs[2])
assert.Contains(t, logger.logs[3], "Uptime: ")
logs = logp.ObserverLogs().TakeAll()
if assert.Len(t, logs, 2) {
assert.Equal(t, "Total non-zero metrics", logs[0].Message)
assertMapHas(t, logs[0].ContextMap(), "monitoring.metrics.count", 20)
assertMapHas(t, logs[0].ContextMap(), "monitoring.metrics.new", 1)
assert.Contains(t, logs[1].Message, "Uptime: ")
}
}

func assertMapHas(t *testing.T, m map[string]interface{}, key string, expectedValue interface{}) {
t.Helper()
v, err := common.MapStr(m).GetValue(key)
if err != nil {
t.Fatal(err)
}
assert.EqualValues(t, expectedValue, v)
}
4 changes: 2 additions & 2 deletions libbeat/tests/system/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,11 @@ def test_logging_metrics(self):
)
proc = self.start_beat(logging_args=["-e"])
self.wait_until(
lambda: self.log_contains("Non-zero metrics in the last 100ms:"),
lambda: self.log_contains("Non-zero metrics in the last 100ms"),
max_timeout=2)
proc.check_kill_and_wait()
self.wait_until(
lambda: self.log_contains("Total non-zero metrics:"),
lambda: self.log_contains("Total non-zero metrics"),
max_timeout=2)

def test_persistent_uuid(self):
Expand Down