Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Beats state reporting #7075

Merged
merged 5 commits into from
Jul 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff]
- Add support to disable html escaping in outputs. {pull}7445[7445]
- Refactor error handing in schema.Apply(). {pull}7335[7335]
- Add additional types to kubernetes metadata {pull}7457[7457]
- Add module state reporting for X-Pack Monitoring. {pull}7075[7075]

*Auditbeat*

Expand Down
3 changes: 3 additions & 0 deletions auditbeat/auditbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1143,6 +1143,9 @@ logging.files:
# never, once, and freely. Default is never.
#ssl.renegotiation: never

#metrics.period: 10s
#state.period: 1m

#================================ HTTP Endpoint ======================================
# Each beat can expose internal metrics through a HTTP endpoint. For security
# reasons the endpoint is disabled by default. This feature is currently experimental.
Expand Down
3 changes: 3 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1803,6 +1803,9 @@ logging.files:
# never, once, and freely. Default is never.
#ssl.renegotiation: never

#metrics.period: 10s
#state.period: 1m

#================================ HTTP Endpoint ======================================
# Each beat can expose internal metrics through a HTTP endpoint. For security
# reasons the endpoint is disabled by default. This feature is currently experimental.
Expand Down
3 changes: 3 additions & 0 deletions heartbeat/heartbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1252,6 +1252,9 @@ logging.files:
# never, once, and freely. Default is never.
#ssl.renegotiation: never

#metrics.period: 10s
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@urso I have second thoughts if we even want to add this to the config file, meaning should we even recommend users to change it?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the reference config. I don't really see a problem in configuring the interval. Allows users to reduce number of documents for monitoring if required.

#state.period: 1m

#================================ HTTP Endpoint ======================================
# Each beat can expose internal metrics through a HTTP endpoint. For security
# reasons the endpoint is disabled by default. This feature is currently experimental.
Expand Down
3 changes: 3 additions & 0 deletions libbeat/_meta/config.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,9 @@ logging.files:
# never, once, and freely. Default is never.
#ssl.renegotiation: never

#metrics.period: 10s
#state.period: 1m

#================================ HTTP Endpoint ======================================
# Each beat can expose internal metrics through a HTTP endpoint. For security
# reasons the endpoint is disabled by default. This feature is currently experimental.
Expand Down
12 changes: 11 additions & 1 deletion libbeat/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func Start(cfg *common.Config) {

// register handlers
mux.HandleFunc("/", rootHandler())
mux.HandleFunc("/state", stateHandler)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would make more sense to call this /modules, do you plan to add anything else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking of also adding information about the datasets, the output used. But not sure where it will evolve into. We probably need soon a more general discussion about api endpoint naming and structure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM, I guess we don't have a hard commit on these names, so we can change them later if needed (?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, API endpoints are not documented so far and we can still change them.

mux.HandleFunc("/stats", statsHandler)
mux.HandleFunc("/dataset", datasetHandler)

Expand All @@ -61,12 +62,21 @@ func rootHandler() func(http.ResponseWriter, *http.Request) {

w.Header().Set("Content-Type", "application/json; charset=utf-8")

data := monitoring.CollectStructSnapshot(monitoring.GetNamespace("state").GetRegistry(), monitoring.Full, false)
data := monitoring.CollectStructSnapshot(monitoring.GetNamespace("info").GetRegistry(), monitoring.Full, false)

print(w, data, r.URL)
}
}

// stateHandler reports state metrics
func stateHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")

data := monitoring.CollectStructSnapshot(monitoring.GetNamespace("state").GetRegistry(), monitoring.Full, false)

print(w, data, r.URL)
}

// statsHandler report expvar and all libbeat/monitoring metrics
func statsHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
Expand Down
3 changes: 1 addition & 2 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,7 @@ func Run(name, idxPrefix, version string, bt beat.Creator) error {
return err
}

registry := monitoring.NewRegistry()
monitoring.GetNamespace("state").SetRegistry(registry)
registry := monitoring.GetNamespace("info").GetRegistry()
monitoring.NewString(registry, "version").Set(b.Info.Version)
monitoring.NewString(registry, "beat").Set(b.Info.Beat)
monitoring.NewString(registry, "name").Set(b.Info.Name)
Expand Down
53 changes: 35 additions & 18 deletions libbeat/monitoring/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,27 @@

package monitoring

import (
"sync"
)
import "sync"

var namespaces = struct {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsoriano This code change as I also already had it change in this PR. Let me now if my implementation also looks good to you.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

sync.Mutex
m map[string]*Namespace
}{
m: make(map[string]*Namespace),
}
var namespaces = NewNamespaces()

// Namespace contains the name of the namespace and it's registry
type Namespace struct {
name string
registry *Registry
}

func newNamespace(name string) *Namespace {
n := &Namespace{
name: name,
}
return n
}

// GetNamespace gets the namespace with the given name.
// If the namespace does not exist yet, a new one is created.
func GetNamespace(name string) *Namespace {
namespaces.Lock()
defer namespaces.Unlock()

n, ok := namespaces.m[name]
if !ok {
n = &Namespace{name: name}
namespaces.m[name] = n
}
return n
return namespaces.Get(name)
}

// SetRegistry sets the registry of the namespace
Expand All @@ -60,3 +52,28 @@ func (n *Namespace) GetRegistry() *Registry {
}
return n.registry
}

// Namespaces is a list of Namespace structs
type Namespaces struct {
sync.Mutex
namespaces map[string]*Namespace
}

// NewNamespaces creates a new namespaces list
func NewNamespaces() *Namespaces {
return &Namespaces{
namespaces: map[string]*Namespace{},
}
}

// Get returns the namespace for the given key. If the key does not exist, new namespace is created.
func (n *Namespaces) Get(key string) *Namespace {
n.Lock()
defer n.Unlock()
if namespace, ok := n.namespaces[key]; ok {
return namespace
}

n.namespaces[key] = newNamespace(key)
return n.namespaces[key]
}
58 changes: 46 additions & 12 deletions libbeat/monitoring/report/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring/report"
esout "github.com/elastic/beats/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/libbeat/publisher"
Expand All @@ -38,7 +39,6 @@ var (
actMonitoringBeats = common.MapStr{
"index": common.MapStr{
"_index": "",
"_type": "beats_stats",
"_routing": nil,
},
}
Expand Down Expand Up @@ -96,23 +96,57 @@ func (c *publishClient) Close() error {

func (c *publishClient) Publish(batch publisher.Batch) error {
events := batch.Events()
bulk := make([]interface{}, 0, 2*len(events))
var failed []publisher.Event
var reason error
for _, event := range events {
bulk = append(bulk,
actMonitoringBeats, report.Event{

// Extract time
t, err := event.Content.Meta.GetValue("type")
if err != nil {
logp.Err("Type not available in monitoring reported. Please report this error: %s", err)
continue
}

var params = map[string]string{}
// Copy params
for k, v := range c.params {
params[k] = v
}
// Extract potential additional params
p, err := event.Content.Meta.GetValue("params")
if err == nil {
p2, ok := p.(map[string]string)
if ok {
for k, v := range p2 {
params[k] = v
}
}
}
Copy link

@urso urso Jun 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about us copying/allocating contents if required only?

This loop rebuilds params every single time, for every event. If the batch contains multiple events, with only one event having params set, we might loose some params again.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is c.params and params via 'meta' always configured? If not, we can reduce some copying here:

eventParams := func(meta common.MapStr) map[string]string {
  tmp, err := event.Content.Meta.GetValue("params")
  if err != nil {
    return nil
  }
  p, ok := tmp.(map[string]string)
  if !ok {
    return nil
  }
  return p
}

cpyKV := func(to map[string]string, from map[string]string) {
  for k, v := range from {
    to[k] = v
  }
}

params = c.params
if userParams := eventParams(event.Content.Meta); len(userParams) > 0 {
  if len(c.params) > 0 {
    params = make(map[string]string, len(params) + len(userParams))
    cpyKV(params, c.params)
    cpyKV(params, userParams)
  } else {
    params = userParams
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As each event contains the interval param in meta and it's required at the moment, I think we have to reprocess it every time.

What do you mean by we loose params? As it's reprocessed every time and we don't modify c.params, all "base" params should always be there?

actMonitoringBeats.Put("index._type", t)

bulk := [2]interface{}{
actMonitoringBeats,
report.Event{
Timestamp: event.Content.Timestamp,
Fields: event.Content.Fields,
})
}
},
}

_, err := c.es.BulkWith("_xpack", "monitoring", c.params, nil, bulk)
if err != nil {
batch.Retry()
return err
// Currently one request per event is sent. Reason is that each event can contain different
// interval params and X-Pack requires to send the interval param.
_, err = c.es.BulkWith("_xpack", "monitoring", params, nil, bulk[:])
if err != nil {
failed = append(failed, event)
reason = err
}
}

batch.ACK()
return nil
if len(failed) > 0 {
batch.RetryEvents(failed)
} else {
batch.ACK()
}
return reason
}

func (c *publishClient) Test(d testing.Driver) {
Expand Down
6 changes: 4 additions & 2 deletions libbeat/monitoring/report/elasticsearch/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ type config struct {
TLS *tlscommon.Config `config:"ssl"`
MaxRetries int `config:"max_retries"`
Timeout time.Duration `config:"timeout"`
Period time.Duration `config:"period"`
MetricsPeriod time.Duration `config:"metrics.period"`
StatePeriod time.Duration `config:"state.period"`
BulkMaxSize int `config:"bulk_max_size" validate:"min=0"`
BufferSize int `config:"buffer_size"`
Tags []string `config:"tags"`
Expand All @@ -55,7 +56,8 @@ var defaultConfig = config{
TLS: nil,
MaxRetries: 3,
Timeout: 60 * time.Second,
Period: 10 * time.Second,
MetricsPeriod: 10 * time.Second,
StatePeriod: 1 * time.Minute,
BulkMaxSize: 50,
BufferSize: 50,
Tags: nil,
Expand Down
33 changes: 19 additions & 14 deletions libbeat/monitoring/report/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"strings"
"time"

"strconv"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/transport/tlscommon"
Expand All @@ -43,7 +45,6 @@ import (
type reporter struct {
done *stopper

period time.Duration
checkRetry time.Duration

// event metadata
Expand Down Expand Up @@ -102,7 +103,6 @@ func makeReporter(beat beat.Info, cfg *common.Config) (report.Reporter, error) {
for k, v := range config.Params {
params[k] = v
}
params["interval"] = config.Period.String()

out := outputs.Group{
Clients: nil,
Expand All @@ -129,7 +129,7 @@ func makeReporter(beat beat.Info, cfg *common.Config) (report.Reporter, error) {
}), nil
}

monitoring := monitoring.Default.NewRegistry("xpack.monitoring")
monitoring := monitoring.Default.GetRegistry("xpack.monitoring")

pipeline, err := pipeline.New(
beat,
Expand All @@ -150,15 +150,14 @@ func makeReporter(beat beat.Info, cfg *common.Config) (report.Reporter, error) {

r := &reporter{
done: newStopper(),
period: config.Period,
beatMeta: makeMeta(beat),
tags: config.Tags,
checkRetry: checkRetry,
pipeline: pipeline,
client: client,
out: out,
}
go r.initLoop()
go r.initLoop(config)
return r, nil
}

Expand All @@ -168,7 +167,7 @@ func (r *reporter) Stop() {
r.pipeline.Close()
}

func (r *reporter) initLoop() {
func (r *reporter) initLoop(c config) {
debugf("Start monitoring endpoint init loop.")
defer debugf("Finish monitoring endpoint init loop.")

Expand Down Expand Up @@ -199,15 +198,16 @@ func (r *reporter) initLoop() {
logp.Info("Successfully connected to X-Pack Monitoring endpoint.")

// Start collector and send loop if monitoring endpoint has been found.
go r.snapshotLoop()
go r.snapshotLoop("state", c.StatePeriod)
go r.snapshotLoop("stats", c.MetricsPeriod)
}

func (r *reporter) snapshotLoop() {
ticker := time.NewTicker(r.period)
func (r *reporter) snapshotLoop(namespace string, period time.Duration) {
ticker := time.NewTicker(period)
defer ticker.Stop()

logp.Info("Start monitoring metrics snapshot loop.")
defer logp.Info("Stop monitoring metrics snapshot loop.")
logp.Info("Start monitoring %s metrics snapshot loop with period %s.", namespace, period)
defer logp.Info("Stop monitoring %s metrics snapshot loop.", namespace)

for {
var ts time.Time
Expand All @@ -218,23 +218,28 @@ func (r *reporter) snapshotLoop() {
case ts = <-ticker.C:
}

snapshot := makeSnapshot(monitoring.Default)
snapshot := makeSnapshot(monitoring.GetNamespace(namespace).GetRegistry())
if snapshot == nil {
debugf("Empty snapshot.")
continue
}

fields := common.MapStr{
"beat": r.beatMeta,
"metrics": snapshot,
namespace: snapshot,
}
if len(r.tags) > 0 {
fields["tags"] = r.tags
}

r.client.Publish(beat.Event{
Timestamp: ts,
Fields: fields,
Meta: common.MapStr{
"type": "beats_" + namespace,
"interval_ms": int64(period / time.Millisecond),
// Converting to seconds as interval only accepts `s` as unit
"params": map[string]string{"interval": strconv.Itoa(int(period/time.Second)) + "s"},
},
})
}
}
Expand Down
Loading