diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 2db0dcd77fa..0358220bc32 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -51,6 +51,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta1...master[Check the HEAD di *Metricbeat* - Add support to exclude labels from kubernetes pod metadata. {pull}4757[4757] +- Add graphite protocol metricbeat module. {pull}4734[4734] *Packetbeat* @@ -175,6 +176,8 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha2...v6.0.0-beta1[View commi - Add `test modules` command, to test modules expected output. {pull}4656[4656] - Add `processors` setting to metricbeat modules. {pull}4699[4699] +*Packetbeat* + *Winlogbeat* - Add the ability to use LevelRaw if Level isn't populated in the event XML. {pull}4257[4257] diff --git a/metricbeat/docs/fields.asciidoc b/metricbeat/docs/fields.asciidoc index 822daaa6e7d..3f984f0087c 100644 --- a/metricbeat/docs/fields.asciidoc +++ b/metricbeat/docs/fields.asciidoc @@ -24,6 +24,7 @@ grouped in the following categories: * <> * <> * <> +* <> * <> * <> * <> @@ -2788,6 +2789,35 @@ format: bytes Bytes in non-idle span. +[[exported-fields-graphite]] +== graphite Fields + +[]experimental +graphite Module + + + +[float] +== graphite Fields + + + + +[float] +== server Fields + +server + + + +[float] +=== graphite.server.example + +type: keyword + +Example field + + [[exported-fields-haproxy]] == HAProxy Fields diff --git a/metricbeat/docs/modules/graphite.asciidoc b/metricbeat/docs/modules/graphite.asciidoc new file mode 100644 index 00000000000..0a03eb79f80 --- /dev/null +++ b/metricbeat/docs/modules/graphite.asciidoc @@ -0,0 +1,41 @@ +//// +This file is generated! See scripts/docs_collector.py +//// + +[[metricbeat-module-graphite]] +== graphite Module + +This is the graphite Module. + + + +[float] +=== Example configuration + +The graphite module supports the standard configuration options that are described +in <>. Here is an example configuration: + +[source,yaml] +---- +metricbeat.modules: +- module: graphite + metricsets: ["server"] + enabled: true +# protocol: "udp" +# templates: +# - filter: "test.*.bash.*" # This would match metrics like test.localhost.bash.stats +# namespace: "test" +# template: ".host.shell.metric*" # test.localhost.bash.stats would become metric=stats and tags host=localhost,shell=bash +# delimiter: "_" + +---- + +[float] +=== Metricsets + +The following metricsets are available: + +* <> + +include::graphite/server.asciidoc[] + diff --git a/metricbeat/docs/modules/graphite/server.asciidoc b/metricbeat/docs/modules/graphite/server.asciidoc new file mode 100644 index 00000000000..beba3cb5cbe --- /dev/null +++ b/metricbeat/docs/modules/graphite/server.asciidoc @@ -0,0 +1,19 @@ +//// +This file is generated! See scripts/docs_collector.py +//// + +[[metricbeat-metricset-graphite-server]] +include::../../../module/graphite/server/_meta/docs.asciidoc[] + + +==== Fields + +For a description of each field in the metricset, see the +<> section. + +Here is an example document generated by this metricset: + +[source,json] +---- +include::../../../module/graphite/server/_meta/data.json[] +---- diff --git a/metricbeat/docs/modules_list.asciidoc b/metricbeat/docs/modules_list.asciidoc index 9952718e900..61cef2e27ca 100644 --- a/metricbeat/docs/modules_list.asciidoc +++ b/metricbeat/docs/modules_list.asciidoc @@ -10,6 +10,7 @@ This file is generated! See scripts/docs_collector.py * <> * <> * <> + * <> * <> * <> * <> @@ -41,6 +42,7 @@ include::modules/docker.asciidoc[] include::modules/dropwizard.asciidoc[] include::modules/elasticsearch.asciidoc[] include::modules/golang.asciidoc[] +include::modules/graphite.asciidoc[] include::modules/haproxy.asciidoc[] include::modules/http.asciidoc[] include::modules/jolokia.asciidoc[] diff --git a/metricbeat/helper/server/server.go b/metricbeat/helper/server/server.go new file mode 100644 index 00000000000..cac93797589 --- /dev/null +++ b/metricbeat/helper/server/server.go @@ -0,0 +1,28 @@ +package server + +import "github.com/elastic/beats/libbeat/common" + +type Meta common.MapStr + +const ( + EventDataKey = "data" +) + +// Server is an interface that can be used to implement servers which can accept data. +type Server interface { + // Start is used to start the server at a well defined port. + Start() + // Stop the server. + Stop() + // Get a channel of events. + GetEvents() chan Event +} + +// Event is an interface that can be used to get the event and event source related information. +type Event interface { + // Get the raw bytes of the event. + GetEvent() common.MapStr + // Get any metadata associated with the data that was received. Ex: client IP for udp message, + // request/response headers for HTTP call. + GetMeta() Meta +} diff --git a/metricbeat/helper/server/tcp/config.go b/metricbeat/helper/server/tcp/config.go new file mode 100644 index 00000000000..1b240b27842 --- /dev/null +++ b/metricbeat/helper/server/tcp/config.go @@ -0,0 +1,15 @@ +package tcp + +type TcpConfig struct { + Host string `config:"host"` + Port int `config:"port"` + ReceiveBufferSize int `config:"receive_buffer_size"` +} + +func defaultTcpConfig() TcpConfig { + return TcpConfig{ + Host: "localhost", + Port: 2003, + ReceiveBufferSize: 1024, + } +} diff --git a/metricbeat/helper/server/tcp/tcp.go b/metricbeat/helper/server/tcp/tcp.go new file mode 100644 index 00000000000..a2ea7819779 --- /dev/null +++ b/metricbeat/helper/server/tcp/tcp.go @@ -0,0 +1,104 @@ +package tcp + +import ( + "fmt" + "net" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/metricbeat/helper/server" + "github.com/elastic/beats/metricbeat/mb" +) + +type TcpServer struct { + listener *net.TCPListener + receiveBufferSize int + done chan struct{} + eventQueue chan server.Event +} + +type TcpEvent struct { + event common.MapStr +} + +func (m *TcpEvent) GetEvent() common.MapStr { + return m.event +} + +func (m *TcpEvent) GetMeta() server.Meta { + return server.Meta{} +} + +func NewTcpServer(base mb.BaseMetricSet) (server.Server, error) { + config := defaultTcpConfig() + err := base.Module().UnpackConfig(&config) + if err != nil { + return nil, err + } + + addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", config.Host, config.Port)) + + if err != nil { + return nil, err + } + + listener, err := net.ListenTCP("tcp", addr) + if err != nil { + return nil, err + } + + logp.Info("Started listening for TCP on: %s:%d", config.Host, config.Port) + return &TcpServer{ + listener: listener, + receiveBufferSize: config.ReceiveBufferSize, + done: make(chan struct{}), + eventQueue: make(chan server.Event), + }, nil +} + +func (g *TcpServer) Start() { + go g.WatchMetrics() +} + +func (g *TcpServer) WatchMetrics() { + buffer := make([]byte, g.receiveBufferSize) + for { + select { + case <-g.done: + return + default: + } + + conn, err := g.listener.Accept() + if err != nil { + logp.Err("Unable to accept connection due to error: %v", err) + continue + } + defer func() { + if conn != nil { + conn.Close() + } + }() + + length, err := conn.Read(buffer) + if err != nil { + logp.Err("Error reading from buffer: %v", err.Error()) + continue + } + g.eventQueue <- &TcpEvent{ + event: common.MapStr{ + server.EventDataKey: buffer[:length], + }, + } + } +} + +func (g *TcpServer) GetEvents() chan server.Event { + return g.eventQueue +} + +func (g *TcpServer) Stop() { + close(g.done) + g.listener.Close() + close(g.eventQueue) +} diff --git a/metricbeat/helper/server/tcp/tcp_test.go b/metricbeat/helper/server/tcp/tcp_test.go new file mode 100644 index 00000000000..6fd1a55c5a7 --- /dev/null +++ b/metricbeat/helper/server/tcp/tcp_test.go @@ -0,0 +1,79 @@ +// +build !integration + +package tcp + +import ( + "fmt" + "net" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/metricbeat/helper/server" +) + +func GetTestTcpServer(host string, port int) (server.Server, error) { + addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", host, port)) + + if err != nil { + return nil, err + } + + listener, err := net.ListenTCP("tcp", addr) + if err != nil { + return nil, err + } + + logp.Info("Started listening for TCP on: %s:%d", host, port) + return &TcpServer{ + listener: listener, + receiveBufferSize: 1024, + done: make(chan struct{}), + eventQueue: make(chan server.Event), + }, nil +} + +func TestTcpServer(t *testing.T) { + host := "127.0.0.1" + port := 2003 + svc, err := GetTestTcpServer(host, port) + if err != nil { + t.Error(err) + t.FailNow() + } + + svc.Start() + defer svc.Stop() + writeToServer(t, "test1", host, port) + msg := <-svc.GetEvents() + + assert.True(t, msg.GetEvent() != nil) + ok, _ := msg.GetEvent().HasKey("data") + assert.True(t, ok) + bytes, _ := msg.GetEvent()["data"].([]byte) + assert.True(t, string(bytes) == "test1") + +} + +func writeToServer(t *testing.T, message, host string, port int) { + servAddr := fmt.Sprintf("%s:%d", host, port) + tcpAddr, err := net.ResolveTCPAddr("tcp", servAddr) + if err != nil { + t.Error(err) + t.FailNow() + } + + conn, err := net.DialTCP("tcp", nil, tcpAddr) + if err != nil { + t.Error(err) + t.FailNow() + } + + defer conn.Close() + _, err = conn.Write([]byte(message)) + if err != nil { + t.Error(err) + t.FailNow() + } +} diff --git a/metricbeat/helper/server/udp/config.go b/metricbeat/helper/server/udp/config.go new file mode 100644 index 00000000000..fdce2be222a --- /dev/null +++ b/metricbeat/helper/server/udp/config.go @@ -0,0 +1,15 @@ +package udp + +type UdpConfig struct { + Host string `config:"host"` + Port int `config:"port"` + ReceiveBufferSize int `config:"receive_buffer_size"` +} + +func defaultUdpConfig() UdpConfig { + return UdpConfig{ + Host: "localhost", + Port: 2003, + ReceiveBufferSize: 1024, + } +} diff --git a/metricbeat/helper/server/udp/udp.go b/metricbeat/helper/server/udp/udp.go new file mode 100644 index 00000000000..fc476b70207 --- /dev/null +++ b/metricbeat/helper/server/udp/udp.go @@ -0,0 +1,98 @@ +package udp + +import ( + "fmt" + "net" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/metricbeat/helper/server" + "github.com/elastic/beats/metricbeat/mb" +) + +type UdpServer struct { + listener *net.UDPConn + receiveBufferSize int + done chan struct{} + eventQueue chan server.Event +} + +type UdpEvent struct { + event common.MapStr + meta server.Meta +} + +func (u *UdpEvent) GetEvent() common.MapStr { + return u.event +} + +func (u *UdpEvent) GetMeta() server.Meta { + return u.meta +} + +func NewUdpServer(base mb.BaseMetricSet) (server.Server, error) { + config := defaultUdpConfig() + err := base.Module().UnpackConfig(&config) + if err != nil { + return nil, err + } + + addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", config.Host, config.Port)) + + if err != nil { + return nil, err + } + + listener, err := net.ListenUDP("udp", addr) + if err != nil { + return nil, err + } + + logp.Info("Started listening for UDP on: %s:%d", config.Host, config.Port) + return &UdpServer{ + listener: listener, + receiveBufferSize: config.ReceiveBufferSize, + done: make(chan struct{}), + eventQueue: make(chan server.Event), + }, nil +} + +func (g *UdpServer) Start() { + go g.WatchMetrics() +} + +func (g *UdpServer) WatchMetrics() { + buffer := make([]byte, g.receiveBufferSize) + for { + select { + case <-g.done: + return + default: + } + + length, addr, err := g.listener.ReadFromUDP(buffer) + if err != nil { + logp.Err("Error reading from buffer: %v", err.Error()) + continue + } + + g.eventQueue <- &UdpEvent{ + event: common.MapStr{ + server.EventDataKey: buffer[:length], + }, + meta: server.Meta{ + "client_ip": addr.IP.String(), + }, + } + } +} + +func (g *UdpServer) GetEvents() chan server.Event { + return g.eventQueue +} + +func (g *UdpServer) Stop() { + close(g.done) + g.listener.Close() + close(g.eventQueue) +} diff --git a/metricbeat/helper/server/udp/udp_test.go b/metricbeat/helper/server/udp/udp_test.go new file mode 100644 index 00000000000..737f88dd4e5 --- /dev/null +++ b/metricbeat/helper/server/udp/udp_test.go @@ -0,0 +1,72 @@ +// +build !integration + +package udp + +import ( + "fmt" + "net" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/metricbeat/helper/server" +) + +func GetTestUdpServer(host string, port int) (server.Server, error) { + addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", host, port)) + + if err != nil { + return nil, err + } + + listener, err := net.ListenUDP("udp", addr) + if err != nil { + return nil, err + } + + logp.Info("Started listening for UDP on: %s:%d", host, port) + return &UdpServer{ + listener: listener, + receiveBufferSize: 1024, + done: make(chan struct{}), + eventQueue: make(chan server.Event), + }, nil +} + +func TestUdpServer(t *testing.T) { + host := "127.0.0.1" + port := 2003 + svc, err := GetTestUdpServer(host, port) + if err != nil { + t.Error(err) + t.FailNow() + } + + svc.Start() + defer svc.Stop() + writeToServer(t, "test1", host, port) + msg := <-svc.GetEvents() + + assert.True(t, msg.GetEvent() != nil) + ok, _ := msg.GetEvent().HasKey("data") + assert.True(t, ok) + bytes, _ := msg.GetEvent()["data"].([]byte) + assert.True(t, string(bytes) == "test1") +} + +func writeToServer(t *testing.T, message, host string, port int) { + servAddr := fmt.Sprintf("%s:%d", host, port) + conn, err := net.Dial("udp", servAddr) + if err != nil { + t.Error(err) + t.FailNow() + } + + defer conn.Close() + _, err = conn.Write([]byte(message)) + if err != nil { + t.Error(err) + t.FailNow() + } +} diff --git a/metricbeat/include/list.go b/metricbeat/include/list.go index b6973935c42..1d02db49827 100644 --- a/metricbeat/include/list.go +++ b/metricbeat/include/list.go @@ -38,6 +38,8 @@ import ( _ "github.com/elastic/beats/metricbeat/module/golang" _ "github.com/elastic/beats/metricbeat/module/golang/expvar" _ "github.com/elastic/beats/metricbeat/module/golang/heap" + _ "github.com/elastic/beats/metricbeat/module/graphite" + _ "github.com/elastic/beats/metricbeat/module/graphite/server" _ "github.com/elastic/beats/metricbeat/module/haproxy" _ "github.com/elastic/beats/metricbeat/module/haproxy/info" _ "github.com/elastic/beats/metricbeat/module/haproxy/stat" diff --git a/metricbeat/metricbeat.reference.yml b/metricbeat/metricbeat.reference.yml index 4638e747573..2061b943aa2 100644 --- a/metricbeat/metricbeat.reference.yml +++ b/metricbeat/metricbeat.reference.yml @@ -155,6 +155,18 @@ metricbeat.modules: namespace: "example" path: "/debug/vars" +#------------------------------ graphite Module ------------------------------ +- module: graphite + metricsets: ["server"] + enabled: true +# protocol: "udp" +# templates: +# - filter: "test.*.bash.*" # This would match metrics like test.localhost.bash.stats +# namespace: "test" +# template: ".host.shell.metric*" # test.localhost.bash.stats would become metric=stats and tags host=localhost,shell=bash +# delimiter: "_" + + #------------------------------- HAProxy Module ------------------------------ - module: haproxy metricsets: ["info", "stat"] diff --git a/metricbeat/module/graphite/_meta/config.yml b/metricbeat/module/graphite/_meta/config.yml new file mode 100644 index 00000000000..b5318ecd7ad --- /dev/null +++ b/metricbeat/module/graphite/_meta/config.yml @@ -0,0 +1,10 @@ +- module: graphite + metricsets: ["server"] + enabled: true +# protocol: "udp" +# templates: +# - filter: "test.*.bash.*" # This would match metrics like test.localhost.bash.stats +# namespace: "test" +# template: ".host.shell.metric*" # test.localhost.bash.stats would become metric=stats and tags host=localhost,shell=bash +# delimiter: "_" + diff --git a/metricbeat/module/graphite/_meta/docs.asciidoc b/metricbeat/module/graphite/_meta/docs.asciidoc new file mode 100644 index 00000000000..a53f5404302 --- /dev/null +++ b/metricbeat/module/graphite/_meta/docs.asciidoc @@ -0,0 +1,4 @@ +== graphite Module + +This is the graphite Module. + diff --git a/metricbeat/module/graphite/_meta/fields.yml b/metricbeat/module/graphite/_meta/fields.yml new file mode 100644 index 00000000000..2f8eb11ae20 --- /dev/null +++ b/metricbeat/module/graphite/_meta/fields.yml @@ -0,0 +1,11 @@ +- key: graphite + title: "graphite" + description: > + []experimental + + graphite Module + fields: + - name: graphite + type: group + description: > + fields: diff --git a/metricbeat/module/graphite/doc.go b/metricbeat/module/graphite/doc.go new file mode 100644 index 00000000000..67de69d767f --- /dev/null +++ b/metricbeat/module/graphite/doc.go @@ -0,0 +1,4 @@ +/* +Package graphite is a Metricbeat module that contains MetricSets. +*/ +package graphite diff --git a/metricbeat/module/graphite/server/_meta/data.json b/metricbeat/module/graphite/server/_meta/data.json new file mode 100644 index 00000000000..3c7e4f4fbcc --- /dev/null +++ b/metricbeat/module/graphite/server/_meta/data.json @@ -0,0 +1,19 @@ +{ + "@timestamp":"2016-05-23T08:05:34.853Z", + "beat":{ + "hostname":"beathost", + "name":"beathost" + }, + "metricset":{ + "host":"localhost", + "module":"graphite", + "name":"server", + "rtt":44269 + }, + "graphite":{ + "collector":{ + "example": "collector" + } + }, + "type":"metricsets" +} diff --git a/metricbeat/module/graphite/server/_meta/docs.asciidoc b/metricbeat/module/graphite/server/_meta/docs.asciidoc new file mode 100644 index 00000000000..8b3f4138661 --- /dev/null +++ b/metricbeat/module/graphite/server/_meta/docs.asciidoc @@ -0,0 +1,3 @@ +=== graphite server MetricSet + +This is the server metricset of the module graphite. diff --git a/metricbeat/module/graphite/server/_meta/fields.yml b/metricbeat/module/graphite/server/_meta/fields.yml new file mode 100644 index 00000000000..9dadb8340ce --- /dev/null +++ b/metricbeat/module/graphite/server/_meta/fields.yml @@ -0,0 +1,9 @@ +- name: server + type: group + description: > + server + fields: + - name: example + type: keyword + description: > + Example field diff --git a/metricbeat/module/graphite/server/config.go b/metricbeat/module/graphite/server/config.go new file mode 100644 index 00000000000..c1419e87a35 --- /dev/null +++ b/metricbeat/module/graphite/server/config.go @@ -0,0 +1,62 @@ +package server + +import ( + "errors" +) + +const ( + defaultDelimiter = "." +) + +type graphiteCollectorConfig struct { + Protocol string `config:"protocol"` + Templates []templateConfig `config:"templates"` + DefaultTemplate templateConfig `config:"default_template"` +} + +type templateConfig struct { + Filter string `config:"filter"` + Template string `config:"template"` + Namespace string `config:"namespace"` + Delimiter string `config:"delimiter"` + Tags map[string]string `config:"tags"` +} + +func defaultGraphiteCollectorConfig() graphiteCollectorConfig { + return graphiteCollectorConfig{ + Protocol: "udp", + DefaultTemplate: templateConfig{ + Filter: "*", + Template: "metric*", + Namespace: "graphite", + Delimiter: ".", + }, + } +} + +func (c graphiteCollectorConfig) Validate() error { + if c.Protocol != "tcp" && c.Protocol != "udp" { + return errors.New("`protocol` can only be tcp or udp") + } + return nil +} + +func (t *templateConfig) Validate() error { + if t.Namespace == "" { + return errors.New("`namespace` can not be empty in template configuration") + } + + if t.Filter == "" { + return errors.New("`filter` can not be empty in template configuration") + } + + if t.Template == "" { + return errors.New("`template` can not be empty in template configuration") + } + + if t.Delimiter == "" { + t.Delimiter = defaultDelimiter + } + + return nil +} diff --git a/metricbeat/module/graphite/server/data.go b/metricbeat/module/graphite/server/data.go new file mode 100644 index 00000000000..2e6a8b6ae27 --- /dev/null +++ b/metricbeat/module/graphite/server/data.go @@ -0,0 +1,165 @@ +package server + +import ( + "errors" + "math" + "strconv" + "strings" + "sync" + "time" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/metricbeat/mb" +) + +type template struct { + Namespace string + Delimiter string + Parts []string + Tags map[string]string +} + +type metricProcessor struct { + templates *tree + defaultTemplate template + sync.RWMutex +} + +func NewMetricProcessor(templates []templateConfig, defaultTemplate templateConfig) *metricProcessor { + templateTree := NewTree(getTemplateFromConfig(defaultTemplate)) + for _, t := range templates { + templateTree.Insert(t.Filter, getTemplateFromConfig(t)) + } + + return &metricProcessor{ + templates: templateTree, + defaultTemplate: getTemplateFromConfig(defaultTemplate), + } +} + +func getTemplateFromConfig(config templateConfig) template { + return template{ + Namespace: config.Namespace, + Tags: config.Tags, + Delimiter: config.Delimiter, + Parts: strings.Split(config.Template, "."), + } +} + +func (m *metricProcessor) AddTemplate(t templateConfig) { + m.Lock() + template := getTemplateFromConfig(t) + m.templates.Insert(t.Filter, template) + m.Unlock() +} + +func (m *metricProcessor) RemoveTemplate(template templateConfig) { + m.Lock() + m.templates.Delete(template.Filter) + m.Unlock() +} + +func (m *metricProcessor) Process(message string) (common.MapStr, error) { + metric, timestamp, value, err := m.splitMetric(message) + if err != nil { + return nil, err + } + + parts := strings.Split(metric, ".") + t := m.FindTemplate(parts) + + var name, namespace string + var tags common.MapStr + if t == nil { + name, tags = m.defaultTemplate.Apply(parts) + namespace = m.defaultTemplate.Namespace + } else { + name, tags = t.Apply(parts) + namespace = t.Namespace + } + + event := common.MapStr{ + "@timestamp": timestamp, + name: value, + mb.NamespaceKey: namespace, + } + if len(tags) != 0 { + event["tag"] = tags + } + return event, nil +} + +func (m *metricProcessor) FindTemplate(metric []string) *template { + return m.templates.Search(metric) +} + +func (m *metricProcessor) splitMetric(metric string) (string, common.Time, float64, error) { + var metricName string + var timestamp common.Time + var value float64 + + parts := strings.Fields(metric) + currentTime := common.Time(time.Now()) + if len(parts) < 2 { + return "", currentTime, 0, errors.New("Message not in expected format") + } else { + metricName = parts[0] + val, err := strconv.ParseFloat(parts[1], 64) + if err != nil { + return "", currentTime, 0, errors.New("Unable to parse metric value") + } else { + value = val + } + } + + if len(parts) == 3 { + if parts[2] == "N" { + timestamp = currentTime + } + ts, err := strconv.ParseFloat(parts[2], 64) + if err != nil { + return "", currentTime, 0, errors.New("Unable to parse timestamp") + } + + if ts != -1 { + timestamp = common.Time(time.Unix(int64(ts), int64((ts-math.Floor(ts))*float64(time.Second)))) + } else { + timestamp = currentTime + } + + } else { + timestamp = currentTime + } + + return metricName, timestamp, value, nil +} + +func (t *template) Apply(parts []string) (string, common.MapStr) { + tags := make(common.MapStr) + + metric := make([]string, 0) + for tagKey, tagVal := range t.Tags { + tags[tagKey] = tagVal + } + + tagsMap := make(map[string][]string) + for i := 0; i < len(t.Parts); i++ { + if t.Parts[i] == "metric" { + metric = append(metric, parts[i]) + } else if t.Parts[i] == "metric*" { + metric = append(metric, parts[i:]...) + } else if t.Parts[i] != "" { + tagsMap[t.Parts[i]] = append(tagsMap[t.Parts[i]], parts[i]) + } + } + + for key, value := range tagsMap { + tags[key] = strings.Join(value, t.Delimiter) + } + + if len(metric) == 0 { + return "", tags + } else { + return strings.Join(metric, t.Delimiter), tags + } +} diff --git a/metricbeat/module/graphite/server/data_test.go b/metricbeat/module/graphite/server/data_test.go new file mode 100644 index 00000000000..cb93978e056 --- /dev/null +++ b/metricbeat/module/graphite/server/data_test.go @@ -0,0 +1,92 @@ +// +build !integration + +package server + +import ( + "math" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/common" +) + +func GetMetricProcessor() *metricProcessor { + templates := []templateConfig{ + { + Namespace: "foo", + Filter: "test.localhost.*", + Template: ".host.shell.metric", + Delimiter: ".", + }, + { + Namespace: "foo", + Filter: "test.xyz.*", + Template: ".host.metric*", + Delimiter: "_", + Tags: map[string]string{ + "a": "b", + }, + }, + } + + defaultTemplate := defaultGraphiteCollectorConfig().DefaultTemplate + return NewMetricProcessor(templates, defaultTemplate) +} + +func TestMetricProcessorAddTemplate(t *testing.T) { + processor := GetMetricProcessor() + temp := templateConfig{ + Namespace: "xyz", + Filter: "a.b.*", + Template: ".host.shell.metric", + Delimiter: ".", + } + processor.AddTemplate(temp) + out := processor.templates.Search([]string{"a", "b", "c"}) + assert.NotNil(t, out) + assert.Equal(t, out.Namespace, temp.Namespace) +} + +func TestMetricProcessorDeleteTemplate(t *testing.T) { + processor := GetMetricProcessor() + temp := templateConfig{ + Namespace: "xyz", + Filter: "a.b.*", + Template: ".host.shell.metric", + Delimiter: ".", + } + processor.AddTemplate(temp) + processor.RemoveTemplate(temp) + out := processor.templates.Search([]string{"a", "b", "c"}) + assert.Nil(t, out) + +} + +func TestMetricProcessorProcess(t *testing.T) { + processor := GetMetricProcessor() + event, err := processor.Process("test.localhost.bash.stats 42 1500934723") + assert.Nil(t, err) + assert.NotNil(t, event) + + tag := event["tag"].(common.MapStr) + assert.Equal(t, len(tag), 2) + assert.Equal(t, tag["host"], "localhost") + assert.Equal(t, tag["shell"], "bash") + + assert.NotNil(t, event["stats"]) + assert.Equal(t, event["stats"], float64(42)) + + ts := float64(1500934723) + timestamp := common.Time(time.Unix(int64(ts), int64((ts-math.Floor(ts))*float64(time.Second)))) + + assert.Equal(t, event["@timestamp"], timestamp) + + event, err = processor.Process("test.localhost.bash.stats 42") + assert.Nil(t, err) + assert.NotNil(t, event) + + assert.NotNil(t, event["stats"]) + assert.Equal(t, event["stats"], float64(42)) +} diff --git a/metricbeat/module/graphite/server/server.go b/metricbeat/module/graphite/server/server.go new file mode 100644 index 00000000000..37e01fbd2dc --- /dev/null +++ b/metricbeat/module/graphite/server/server.go @@ -0,0 +1,88 @@ +package server + +import ( + "github.com/elastic/beats/libbeat/common/cfgwarn" + serverhelper "github.com/elastic/beats/metricbeat/helper/server" + "github.com/elastic/beats/metricbeat/helper/server/tcp" + "github.com/elastic/beats/metricbeat/helper/server/udp" + "github.com/elastic/beats/metricbeat/mb" +) + +// init registers the MetricSet with the central registry. +// The New method will be called after the setup of the module and before starting to fetch data +func init() { + if err := mb.Registry.AddMetricSet("graphite", "server", New); err != nil { + panic(err) + } +} + +// MetricSet type defines all fields of the MetricSet +// As a minimum it must inherit the mb.BaseMetricSet fields, but can be extended with +// additional entries. These variables can be used to persist data or configuration between +// multiple fetch calls. +type MetricSet struct { + mb.BaseMetricSet + server serverhelper.Server + processor *metricProcessor +} + +// New create a new instance of the MetricSet +// Part of new is also setting up the configuration by processing additional +// configuration entries if needed. +func New(base mb.BaseMetricSet) (mb.MetricSet, error) { + cfgwarn.Experimental("The graphite server metricset is experimental") + + config := defaultGraphiteCollectorConfig() + if err := base.Module().UnpackConfig(&config); err != nil { + return nil, err + } + + var s serverhelper.Server + var err error + if config.Protocol == "tcp" { + s, err = tcp.NewTcpServer(base) + } else { + s, err = udp.NewUdpServer(base) + } + + if err != nil { + return nil, err + } + + processor := NewMetricProcessor(config.Templates, config.DefaultTemplate) + + return &MetricSet{ + BaseMetricSet: base, + server: s, + processor: processor, + }, nil +} + +// Run method provides the Graphite server with a reporter with which events can be reported. +func (m *MetricSet) Run(reporter mb.PushReporter) { + // Start event watcher + m.server.Start() + + for { + select { + case <-reporter.Done(): + m.server.Stop() + return + case msg := <-m.server.GetEvents(): + input := msg.GetEvent() + bytesRaw, ok := input[serverhelper.EventDataKey] + if ok { + bytes, ok := bytesRaw.([]byte) + if ok && len(bytes) != 0 { + event, err := m.processor.Process(string(bytes)) + if err != nil { + reporter.Error(err) + } else { + reporter.Event(event) + } + } + } + + } + } +} diff --git a/metricbeat/module/graphite/server/tree.go b/metricbeat/module/graphite/server/tree.go new file mode 100644 index 00000000000..cd84b4db752 --- /dev/null +++ b/metricbeat/module/graphite/server/tree.go @@ -0,0 +1,154 @@ +package server + +import ( + "strings" +) + +type tree struct { + root *node // Root node +} + +// node is a single element within the tree +type node struct { + parent *node + entry *entry // entry + children map[string]*node // Children nodes +} + +// entry represents the key-value pair contained within nodes +type entry struct { + key string + value *template +} + +func (n *node) FindChild(key string) *node { + child, ok := n.children[key] + if ok { + return child + } + return nil +} + +func (n *node) AddChild(key string) *node { + temp := &node{ + parent: n, + children: make(map[string]*node), + } + + n.children[key] = temp + return temp +} + +func (n *node) GetTemplate() *template { + if n.entry != nil { + return n.entry.value + } + + return nil +} + +func (n *node) Search(parts []string) *template { + if len(parts) == 0 || len(n.children) == 0 { + return n.GetTemplate() + } + child := n.FindChild(parts[0]) + if child == nil { + child = n.FindChild("*") + } + + if child != nil { + return child.Search(parts[1:]) + } + + return n.GetTemplate() +} + +func (t *tree) Insert(filter string, template template) { + cur := t.root + parts := strings.Split(filter, ".") + for _, part := range parts { + child := cur.FindChild(part) + if child == nil { + child = cur.AddChild(part) + if child != nil && part == "*" { + child.entry = cur.entry + } + } + cur = child + } + + if cur != nil { + cur.entry = &entry{ + key: parts[len(parts)-1], + value: &template, + } + } +} + +func (t *tree) Search(parts []string) *template { + return t.root.Search(parts) +} + +func (t *tree) Delete(filter string) { + parts := strings.Split(filter, ".") + cur := t.root + for _, part := range parts { + child := cur.FindChild(part) + if child == nil { + // entry does not exist + return + } + cur = child + } + + // we are in the last element at this point + if cur != nil { + // There are more entries, so just make the template nil and make all subsequent '*' templates nil + if len(cur.children) != 0 { + cur.entry = nil + doBreak := false + temp := cur + for doBreak == false { + child := temp.FindChild("*") + if child != nil { + child.entry = nil + temp = child + } else { + doBreak = true + } + } + } else { + // Keep removing parts till there is no more childless entry + temp := cur + length := len(parts) + for temp != t.root { + parent := temp.parent + // Remove only if there is only one child for the parent + if len(parent.children) == 1 { + delete(parent.children, parts[length-1]) + temp = parent + length = length - 1 + } else { + break + } + + } + + } + } +} + +func NewTree(defaultTemplate template) *tree { + root := &node{ + entry: &entry{ + key: "*", + value: &defaultTemplate, + }, + children: make(map[string]*node), + parent: nil, + } + + return &tree{ + root: root, + } +} diff --git a/metricbeat/module/graphite/server/tree_test.go b/metricbeat/module/graphite/server/tree_test.go new file mode 100644 index 00000000000..ea2a87ba687 --- /dev/null +++ b/metricbeat/module/graphite/server/tree_test.go @@ -0,0 +1,99 @@ +// +build !integration + +package server + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func NewTestTree() *tree { + defaultTemplate := template{ + Parts: []string{"metric*"}, + Namespace: "foo", + Delimiter: ".", + } + + return NewTree(defaultTemplate) +} +func TestTreeInsert(t *testing.T) { + test := NewTestTree() + temp := template{ + Delimiter: "_", + Namespace: "foo", + Parts: []string{"", "host", "metric*"}, + } + test.Insert("test.localhost.*", temp) + + assert.Equal(t, len(test.root.children), 1) + child := test.root.children["test"] + assert.NotNil(t, child) + assert.Nil(t, child.GetTemplate()) + + cur := child + assert.Equal(t, len(cur.children), 1) + child = cur.children["localhost"] + assert.NotNil(t, child) + assert.Nil(t, child.GetTemplate()) + + cur = child + assert.Equal(t, len(cur.children), 1) + child = cur.children["*"] + assert.NotNil(t, child) + assert.NotNil(t, child.GetTemplate()) + assert.Equal(t, &temp, child.GetTemplate()) + + cur = child + assert.Equal(t, len(cur.children), 0) + test.Insert("test.localhost.*.foo", temp) + assert.Equal(t, len(cur.children), 1) + + test.Insert("a.b.c.d", temp) + assert.Equal(t, len(test.root.children), 2) +} + +func TestTreeSearch(t *testing.T) { + test := NewTestTree() + temp := template{ + Delimiter: "_", + Namespace: "foo", + Parts: []string{"", "host", "metric*"}, + } + test.Insert("test.localhost.*", temp) + + // Search for a valid scenario + outTemp := test.Search([]string{"test", "localhost", "bash", "stats"}) + assert.NotNil(t, outTemp) + assert.Equal(t, outTemp, &temp) + + // Search for a case where only half the tree is traversed and there is no entry + outTemp = test.Search([]string{"test"}) + assert.Nil(t, outTemp) + + // Search for a default case where root data is returned + outTemp = test.Search([]string{"a.b.c.d"}) + assert.NotNil(t, outTemp) + assert.Equal(t, outTemp, test.root.entry.value) +} + +func TestTreeDelete(t *testing.T) { + test := NewTestTree() + temp := template{ + Delimiter: "_", + Namespace: "foo", + Parts: []string{"", "host", "metric*"}, + } + test.Insert("test.localhost.*", temp) + test.Delete("test.localhost.*") + + assert.Equal(t, len(test.root.children), 0) + + test.Insert("test.localhost.*", temp) + test.Insert("test.*", temp) + test.Delete("test.*") + + assert.Equal(t, len(test.root.children), 1) + assert.NotNil(t, test.root.FindChild("test")) + +} diff --git a/metricbeat/module/kubernetes/event/config.go b/metricbeat/module/kubernetes/event/config.go index 551976fd5e2..5c65f2e2089 100644 --- a/metricbeat/module/kubernetes/event/config.go +++ b/metricbeat/module/kubernetes/event/config.go @@ -3,8 +3,6 @@ package event import ( "errors" "time" - - "github.com/elastic/beats/libbeat/common" ) type kubeEventsConfig struct { @@ -18,8 +16,6 @@ type Enabled struct { Enabled bool `config:"enabled"` } -type PluginConfig []map[string]common.Config - func defaultKuberentesEventsConfig() kubeEventsConfig { return kubeEventsConfig{ InCluster: true, diff --git a/metricbeat/modules.d/graphite.yml.disabled b/metricbeat/modules.d/graphite.yml.disabled new file mode 100644 index 00000000000..b5318ecd7ad --- /dev/null +++ b/metricbeat/modules.d/graphite.yml.disabled @@ -0,0 +1,10 @@ +- module: graphite + metricsets: ["server"] + enabled: true +# protocol: "udp" +# templates: +# - filter: "test.*.bash.*" # This would match metrics like test.localhost.bash.stats +# namespace: "test" +# template: ".host.shell.metric*" # test.localhost.bash.stats would become metric=stats and tags host=localhost,shell=bash +# delimiter: "_" +