Skip to content

Commit

Permalink
Implementing graphite protocol metricbeat module (elastic#4734)
Browse files Browse the repository at this point in the history
* Implementing graphite protocol metricbeat module
  • Loading branch information
vjsamuel authored and exekias committed Jul 27, 2017
1 parent 5750cc0 commit 38eb3eb
Show file tree
Hide file tree
Showing 29 changed files with 1,250 additions and 4 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta1...master[Check the HEAD di
*Metricbeat*

- Add support to exclude labels from kubernetes pod metadata. {pull}4757[4757]
- Add graphite protocol metricbeat module. {pull}4734[4734]

*Packetbeat*

Expand Down Expand Up @@ -175,6 +176,8 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha2...v6.0.0-beta1[View commi
- Add `test modules` command, to test modules expected output. {pull}4656[4656]
- Add `processors` setting to metricbeat modules. {pull}4699[4699]
*Packetbeat*
*Winlogbeat*
- Add the ability to use LevelRaw if Level isn't populated in the event XML. {pull}4257[4257]
Expand Down
30 changes: 30 additions & 0 deletions metricbeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ grouped in the following categories:
* <<exported-fields-dropwizard>>
* <<exported-fields-elasticsearch>>
* <<exported-fields-golang>>
* <<exported-fields-graphite>>
* <<exported-fields-haproxy>>
* <<exported-fields-http>>
* <<exported-fields-jolokia>>
Expand Down Expand Up @@ -2788,6 +2789,35 @@ format: bytes
Bytes in non-idle span.
[[exported-fields-graphite]]
== graphite Fields
[]experimental
graphite Module
[float]
== graphite Fields
[float]
== server Fields
server
[float]
=== graphite.server.example
type: keyword
Example field
[[exported-fields-haproxy]]
== HAProxy Fields
Expand Down
41 changes: 41 additions & 0 deletions metricbeat/docs/modules/graphite.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
////
This file is generated! See scripts/docs_collector.py
////

[[metricbeat-module-graphite]]
== graphite Module

This is the graphite Module.



[float]
=== Example configuration

The graphite module supports the standard configuration options that are described
in <<configuration-metricbeat>>. Here is an example configuration:

[source,yaml]
----
metricbeat.modules:
- module: graphite
metricsets: ["server"]
enabled: true
# protocol: "udp"
# templates:
# - filter: "test.*.bash.*" # This would match metrics like test.localhost.bash.stats
# namespace: "test"
# template: ".host.shell.metric*" # test.localhost.bash.stats would become metric=stats and tags host=localhost,shell=bash
# delimiter: "_"
----

[float]
=== Metricsets

The following metricsets are available:

* <<metricbeat-metricset-graphite-server,server>>

include::graphite/server.asciidoc[]

19 changes: 19 additions & 0 deletions metricbeat/docs/modules/graphite/server.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
////
This file is generated! See scripts/docs_collector.py
////

[[metricbeat-metricset-graphite-server]]
include::../../../module/graphite/server/_meta/docs.asciidoc[]


==== Fields

For a description of each field in the metricset, see the
<<exported-fields-graphite,exported fields>> section.

Here is an example document generated by this metricset:

[source,json]
----
include::../../../module/graphite/server/_meta/data.json[]
----
2 changes: 2 additions & 0 deletions metricbeat/docs/modules_list.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ This file is generated! See scripts/docs_collector.py
* <<metricbeat-module-dropwizard,Dropwizard>>
* <<metricbeat-module-elasticsearch,Elasticsearch>>
* <<metricbeat-module-golang,Golang>>
* <<metricbeat-module-graphite,graphite>>
* <<metricbeat-module-haproxy,HAProxy>>
* <<metricbeat-module-http,HTTP>>
* <<metricbeat-module-jolokia,Jolokia>>
Expand Down Expand Up @@ -41,6 +42,7 @@ include::modules/docker.asciidoc[]
include::modules/dropwizard.asciidoc[]
include::modules/elasticsearch.asciidoc[]
include::modules/golang.asciidoc[]
include::modules/graphite.asciidoc[]
include::modules/haproxy.asciidoc[]
include::modules/http.asciidoc[]
include::modules/jolokia.asciidoc[]
Expand Down
28 changes: 28 additions & 0 deletions metricbeat/helper/server/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package server

import "github.com/elastic/beats/libbeat/common"

type Meta common.MapStr

const (
EventDataKey = "data"
)

// Server is an interface that can be used to implement servers which can accept data.
type Server interface {
// Start is used to start the server at a well defined port.
Start()
// Stop the server.
Stop()
// Get a channel of events.
GetEvents() chan Event
}

// Event is an interface that can be used to get the event and event source related information.
type Event interface {
// Get the raw bytes of the event.
GetEvent() common.MapStr
// Get any metadata associated with the data that was received. Ex: client IP for udp message,
// request/response headers for HTTP call.
GetMeta() Meta
}
15 changes: 15 additions & 0 deletions metricbeat/helper/server/tcp/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package tcp

type TcpConfig struct {
Host string `config:"host"`
Port int `config:"port"`
ReceiveBufferSize int `config:"receive_buffer_size"`
}

func defaultTcpConfig() TcpConfig {
return TcpConfig{
Host: "localhost",
Port: 2003,
ReceiveBufferSize: 1024,
}
}
104 changes: 104 additions & 0 deletions metricbeat/helper/server/tcp/tcp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package tcp

import (
"fmt"
"net"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/helper/server"
"github.com/elastic/beats/metricbeat/mb"
)

type TcpServer struct {
listener *net.TCPListener
receiveBufferSize int
done chan struct{}
eventQueue chan server.Event
}

type TcpEvent struct {
event common.MapStr
}

func (m *TcpEvent) GetEvent() common.MapStr {
return m.event
}

func (m *TcpEvent) GetMeta() server.Meta {
return server.Meta{}
}

func NewTcpServer(base mb.BaseMetricSet) (server.Server, error) {
config := defaultTcpConfig()
err := base.Module().UnpackConfig(&config)
if err != nil {
return nil, err
}

addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", config.Host, config.Port))

if err != nil {
return nil, err
}

listener, err := net.ListenTCP("tcp", addr)
if err != nil {
return nil, err
}

logp.Info("Started listening for TCP on: %s:%d", config.Host, config.Port)
return &TcpServer{
listener: listener,
receiveBufferSize: config.ReceiveBufferSize,
done: make(chan struct{}),
eventQueue: make(chan server.Event),
}, nil
}

func (g *TcpServer) Start() {
go g.WatchMetrics()
}

func (g *TcpServer) WatchMetrics() {
buffer := make([]byte, g.receiveBufferSize)
for {
select {
case <-g.done:
return
default:
}

conn, err := g.listener.Accept()
if err != nil {
logp.Err("Unable to accept connection due to error: %v", err)
continue
}
defer func() {
if conn != nil {
conn.Close()
}
}()

length, err := conn.Read(buffer)
if err != nil {
logp.Err("Error reading from buffer: %v", err.Error())
continue
}
g.eventQueue <- &TcpEvent{
event: common.MapStr{
server.EventDataKey: buffer[:length],
},
}
}
}

func (g *TcpServer) GetEvents() chan server.Event {
return g.eventQueue
}

func (g *TcpServer) Stop() {
close(g.done)
g.listener.Close()
close(g.eventQueue)
}
79 changes: 79 additions & 0 deletions metricbeat/helper/server/tcp/tcp_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// +build !integration

package tcp

import (
"fmt"
"net"
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/helper/server"
)

func GetTestTcpServer(host string, port int) (server.Server, error) {
addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", host, port))

if err != nil {
return nil, err
}

listener, err := net.ListenTCP("tcp", addr)
if err != nil {
return nil, err
}

logp.Info("Started listening for TCP on: %s:%d", host, port)
return &TcpServer{
listener: listener,
receiveBufferSize: 1024,
done: make(chan struct{}),
eventQueue: make(chan server.Event),
}, nil
}

func TestTcpServer(t *testing.T) {
host := "127.0.0.1"
port := 2003
svc, err := GetTestTcpServer(host, port)
if err != nil {
t.Error(err)
t.FailNow()
}

svc.Start()
defer svc.Stop()
writeToServer(t, "test1", host, port)
msg := <-svc.GetEvents()

assert.True(t, msg.GetEvent() != nil)
ok, _ := msg.GetEvent().HasKey("data")
assert.True(t, ok)
bytes, _ := msg.GetEvent()["data"].([]byte)
assert.True(t, string(bytes) == "test1")

}

func writeToServer(t *testing.T, message, host string, port int) {
servAddr := fmt.Sprintf("%s:%d", host, port)
tcpAddr, err := net.ResolveTCPAddr("tcp", servAddr)
if err != nil {
t.Error(err)
t.FailNow()
}

conn, err := net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
t.Error(err)
t.FailNow()
}

defer conn.Close()
_, err = conn.Write([]byte(message))
if err != nil {
t.Error(err)
t.FailNow()
}
}
15 changes: 15 additions & 0 deletions metricbeat/helper/server/udp/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package udp

type UdpConfig struct {
Host string `config:"host"`
Port int `config:"port"`
ReceiveBufferSize int `config:"receive_buffer_size"`
}

func defaultUdpConfig() UdpConfig {
return UdpConfig{
Host: "localhost",
Port: 2003,
ReceiveBufferSize: 1024,
}
}
Loading

0 comments on commit 38eb3eb

Please sign in to comment.