Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementing graphite protocol metricbeat module #4734

Merged
merged 2 commits into from
Jul 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta1...master[Check the HEAD di
*Metricbeat*

- Add support to exclude labels from kubernetes pod metadata. {pull}4757[4757]
- Add graphite protocol metricbeat module. {pull}4734[4734]

*Packetbeat*

Expand Down Expand Up @@ -171,6 +172,8 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha2...v6.0.0-beta1[View commi
- Add `test modules` command, to test modules expected output. {pull}4656[4656]
- Add `processors` setting to metricbeat modules. {pull}4699[4699]

*Packetbeat*

*Winlogbeat*

- Add the ability to use LevelRaw if Level isn't populated in the event XML. {pull}4257[4257]
Expand Down
30 changes: 30 additions & 0 deletions metricbeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ grouped in the following categories:
* <<exported-fields-dropwizard>>
* <<exported-fields-elasticsearch>>
* <<exported-fields-golang>>
* <<exported-fields-graphite>>
* <<exported-fields-haproxy>>
* <<exported-fields-http>>
* <<exported-fields-jolokia>>
Expand Down Expand Up @@ -2788,6 +2789,35 @@ format: bytes
Bytes in non-idle span.


[[exported-fields-graphite]]
== graphite Fields

[]experimental
graphite Module



[float]
== graphite Fields




[float]
== server Fields

server



[float]
=== graphite.server.example

type: keyword

Example field


[[exported-fields-haproxy]]
== HAProxy Fields

Expand Down
41 changes: 41 additions & 0 deletions metricbeat/docs/modules/graphite.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
////
This file is generated! See scripts/docs_collector.py
////

[[metricbeat-module-graphite]]
== graphite Module

This is the graphite Module.



[float]
=== Example configuration

The graphite module supports the standard configuration options that are described
in <<configuration-metricbeat>>. Here is an example configuration:

[source,yaml]
----
metricbeat.modules:
- module: graphite
metricsets: ["server"]
enabled: true
# protocol: "udp"
# templates:
# - filter: "test.*.bash.*" # This would match metrics like test.localhost.bash.stats
# namespace: "test"
# template: ".host.shell.metric*" # test.localhost.bash.stats would become metric=stats and tags host=localhost,shell=bash
# delimiter: "_"

----

[float]
=== Metricsets

The following metricsets are available:

* <<metricbeat-metricset-graphite-server,server>>

include::graphite/server.asciidoc[]

19 changes: 19 additions & 0 deletions metricbeat/docs/modules/graphite/server.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
////
This file is generated! See scripts/docs_collector.py
////

[[metricbeat-metricset-graphite-server]]
include::../../../module/graphite/server/_meta/docs.asciidoc[]


==== Fields

For a description of each field in the metricset, see the
<<exported-fields-graphite,exported fields>> section.

Here is an example document generated by this metricset:

[source,json]
----
include::../../../module/graphite/server/_meta/data.json[]
----
2 changes: 2 additions & 0 deletions metricbeat/docs/modules_list.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ This file is generated! See scripts/docs_collector.py
* <<metricbeat-module-dropwizard,Dropwizard>>
* <<metricbeat-module-elasticsearch,Elasticsearch>>
* <<metricbeat-module-golang,Golang>>
* <<metricbeat-module-graphite,graphite>>
* <<metricbeat-module-haproxy,HAProxy>>
* <<metricbeat-module-http,HTTP>>
* <<metricbeat-module-jolokia,Jolokia>>
Expand Down Expand Up @@ -41,6 +42,7 @@ include::modules/docker.asciidoc[]
include::modules/dropwizard.asciidoc[]
include::modules/elasticsearch.asciidoc[]
include::modules/golang.asciidoc[]
include::modules/graphite.asciidoc[]
include::modules/haproxy.asciidoc[]
include::modules/http.asciidoc[]
include::modules/jolokia.asciidoc[]
Expand Down
28 changes: 28 additions & 0 deletions metricbeat/helper/server/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package server

import "github.com/elastic/beats/libbeat/common"

type Meta common.MapStr

const (
EventDataKey = "data"
)

// Server is an interface that can be used to implement servers which can accept data.
type Server interface {
// Start is used to start the server at a well defined port.
Start()
// Stop the server.
Stop()
// Get a channel of events.
GetEvents() chan Event
}

// Event is an interface that can be used to get the event and event source related information.
type Event interface {
// Get the raw bytes of the event.
GetEvent() common.MapStr
// Get any metadata associated with the data that was received. Ex: client IP for udp message,
// request/response headers for HTTP call.
GetMeta() Meta
}
15 changes: 15 additions & 0 deletions metricbeat/helper/server/tcp/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package tcp

type TcpConfig struct {
Host string `config:"host"`
Port int `config:"port"`
ReceiveBufferSize int `config:"receive_buffer_size"`
}

func defaultTcpConfig() TcpConfig {
return TcpConfig{
Host: "localhost",
Port: 2003,
ReceiveBufferSize: 1024,
}
}
104 changes: 104 additions & 0 deletions metricbeat/helper/server/tcp/tcp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package tcp

import (
"fmt"
"net"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/helper/server"
"github.com/elastic/beats/metricbeat/mb"
)

type TcpServer struct {
listener *net.TCPListener
receiveBufferSize int
done chan struct{}
eventQueue chan server.Event
}

type TcpEvent struct {
event common.MapStr
}

func (m *TcpEvent) GetEvent() common.MapStr {
return m.event
}

func (m *TcpEvent) GetMeta() server.Meta {
return server.Meta{}
}

func NewTcpServer(base mb.BaseMetricSet) (server.Server, error) {
config := defaultTcpConfig()
err := base.Module().UnpackConfig(&config)
if err != nil {
return nil, err
}

addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", config.Host, config.Port))

if err != nil {
return nil, err
}

listener, err := net.ListenTCP("tcp", addr)
if err != nil {
return nil, err
}

logp.Info("Started listening for TCP on: %s:%d", config.Host, config.Port)
return &TcpServer{
listener: listener,
receiveBufferSize: config.ReceiveBufferSize,
done: make(chan struct{}),
eventQueue: make(chan server.Event),
}, nil
}

func (g *TcpServer) Start() {
go g.WatchMetrics()
}

func (g *TcpServer) WatchMetrics() {
buffer := make([]byte, g.receiveBufferSize)
for {
select {
case <-g.done:
return
default:
}

conn, err := g.listener.Accept()
if err != nil {
logp.Err("Unable to accept connection due to error: %v", err)
continue
}
defer func() {
if conn != nil {
conn.Close()
}
}()

length, err := conn.Read(buffer)
if err != nil {
logp.Err("Error reading from buffer: %v", err.Error())
continue
}
g.eventQueue <- &TcpEvent{
event: common.MapStr{
server.EventDataKey: buffer[:length],
},
}
}
}

func (g *TcpServer) GetEvents() chan server.Event {
return g.eventQueue
}

func (g *TcpServer) Stop() {
close(g.done)
g.listener.Close()
close(g.eventQueue)
}
79 changes: 79 additions & 0 deletions metricbeat/helper/server/tcp/tcp_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// +build !integration

package tcp

import (
"fmt"
"net"
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/helper/server"
)

func GetTestTcpServer(host string, port int) (server.Server, error) {
addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", host, port))

if err != nil {
return nil, err
}

listener, err := net.ListenTCP("tcp", addr)
if err != nil {
return nil, err
}

logp.Info("Started listening for TCP on: %s:%d", host, port)
return &TcpServer{
listener: listener,
receiveBufferSize: 1024,
done: make(chan struct{}),
eventQueue: make(chan server.Event),
}, nil
}

func TestTcpServer(t *testing.T) {
host := "127.0.0.1"
port := 2003
svc, err := GetTestTcpServer(host, port)
if err != nil {
t.Error(err)
t.FailNow()
}

svc.Start()
defer svc.Stop()
writeToServer(t, "test1", host, port)
msg := <-svc.GetEvents()

assert.True(t, msg.GetEvent() != nil)
ok, _ := msg.GetEvent().HasKey("data")
assert.True(t, ok)
bytes, _ := msg.GetEvent()["data"].([]byte)
assert.True(t, string(bytes) == "test1")

}

func writeToServer(t *testing.T, message, host string, port int) {
servAddr := fmt.Sprintf("%s:%d", host, port)
tcpAddr, err := net.ResolveTCPAddr("tcp", servAddr)
if err != nil {
t.Error(err)
t.FailNow()
}

conn, err := net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
t.Error(err)
t.FailNow()
}

defer conn.Close()
_, err = conn.Write([]byte(message))
if err != nil {
t.Error(err)
t.FailNow()
}
}
15 changes: 15 additions & 0 deletions metricbeat/helper/server/udp/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package udp

type UdpConfig struct {
Host string `config:"host"`
Port int `config:"port"`
ReceiveBufferSize int `config:"receive_buffer_size"`
}

func defaultUdpConfig() UdpConfig {
return UdpConfig{
Host: "localhost",
Port: 2003,
ReceiveBufferSize: 1024,
}
}
Loading