From 27d22e1352e1a28903fe3a42fb756582ab507664 Mon Sep 17 00:00:00 2001 From: Thibault Cohen Date: Tue, 26 Jan 2016 19:12:54 -0500 Subject: [PATCH] Add tcp/udp check connection input plugin --- plugins/inputs/all/all.go | 1 + plugins/inputs/connection/README.md | 66 ++++++ plugins/inputs/connection/connection.go | 218 +++++++++++++++++++ plugins/inputs/connection/connection_test.go | 198 +++++++++++++++++ 4 files changed, 483 insertions(+) create mode 100644 plugins/inputs/connection/README.md create mode 100644 plugins/inputs/connection/connection.go create mode 100644 plugins/inputs/connection/connection_test.go diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index f263c3f684720..6a0b23480db15 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -4,6 +4,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/aerospike" _ "github.com/influxdata/telegraf/plugins/inputs/apache" _ "github.com/influxdata/telegraf/plugins/inputs/bcache" + _ "github.com/influxdata/telegraf/plugins/inputs/connection" _ "github.com/influxdata/telegraf/plugins/inputs/couchdb" _ "github.com/influxdata/telegraf/plugins/inputs/disque" _ "github.com/influxdata/telegraf/plugins/inputs/docker" diff --git a/plugins/inputs/connection/README.md b/plugins/inputs/connection/README.md new file mode 100644 index 0000000000000..1f46cc66a8572 --- /dev/null +++ b/plugins/inputs/connection/README.md @@ -0,0 +1,66 @@ +# Example Input Plugin + +The input plugin test UDP/TCP connections response time. +It can also check response text. + +### Configuration: + +``` +# List of UDP/TCP connections you want to check +[[inputs.connection]] + protocol = "tcp" + # Server address (default IP localhost) + address = "github.com:80" + # Set timeout (default 1.0) + timeout = 1.0 + # Set read timeout (default 1.0) + read_timeout = 1.0 + # String sent to the server + send = "ssh" + # Expected string in answer + expect = "ssh" + +[[inputs.connection]] + protocol = "tcp" + address = ":80" + +[[inputs.connection]] + protocol = "udp" + # Server address (default IP localhost) + address = "github.com:80" + # Set timeout (default 1.0) + timeout = 1.0 + # Set read timeout (default 1.0) + read_timeout = 1.0 + # String sent to the server + send = "ssh" + # Expected string in answer + expect = "ssh" + +[[inputs.connection]] + protocol = "udp" + address = "localhost:161" + timeout = 2.0 +``` + +### Measurements & Fields: + +- connection + - response_time (float, seconds) + - string_found (bool) # Only if "expected: option is set + +### Tags: + +- All measurements have the following tags: + - host + - port + - protocol + +### Example Output: + +``` +$ ./telegraf -config telegraf.conf -input-filter example -test +connection,host=127.0.0.1,port=22,protocol=tcp response_time=0.18070360500000002,string_found=true 1454785464182527094 +connection,host=127.0.0.1,port=2222,protocol=tcp response_time=1.090124776,string_found=false 1454784433658942325 + +``` diff --git a/plugins/inputs/connection/connection.go b/plugins/inputs/connection/connection.go new file mode 100644 index 0000000000000..fdf0def86bcc2 --- /dev/null +++ b/plugins/inputs/connection/connection.go @@ -0,0 +1,218 @@ +package connection + +import ( + "bufio" + "errors" + "net" + "net/textproto" + "regexp" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" +) + +// Connections struct +type Connection struct { + Address string + Timeout float64 + ReadTimeout float64 + Send string + Expect string + Protocol string +} + +func (_ *Connection) Description() string { + return "Ping given url(s) and return statistics" +} + +var sampleConfig = ` + [[inputs.connection]] + protocol = "tcp" + # Server address (default IP localhost) + address = "github.com:80" + # Set timeout (default 1.0) + timeout = 1.0 + # Set read timeout (default 1.0) + read_timeout = 1.0 + # String sent to the server + send = "ssh" + # Expected string in answer + expect = "ssh" + + [[inputs.connection]] + protocol = "tcp" + address = ":80" + + [[inputs.connection]] + protocol = "udp" + # Server address (default IP localhost) + address = "github.com:80" + # Set timeout (default 1.0) + timeout = 1.0 + # Set read timeout (default 1.0) + read_timeout = 1.0 + # String sent to the server + send = "ssh" + # Expected string in answer + expect = "ssh" + + [[inputs.connection]] + protocol = "udp" + address = "localhost:161" + timeout = 2.0 +` + +func (_ *Connection) SampleConfig() string { + return sampleConfig +} + +func (t *Connection) TcpGather() (map[string]interface{}, error) { + // Prepare fields + fields := make(map[string]interface{}) + // Start Timer + start := time.Now() + // Resolving + tcpAddr, err := net.ResolveTCPAddr("tcp", t.Address) + // Connecting + conn, err := net.DialTCP("tcp", nil, tcpAddr) + // Stop timer + responseTime := time.Since(start).Seconds() + // Handle error + if err != nil { + return nil, err + } + defer conn.Close() + // Send string if needed + if t.Send != "" { + msg := []byte(t.Send) + conn.Write(msg) + conn.CloseWrite() + // Stop timer + responseTime = time.Since(start).Seconds() + } + // Read string if needed + if t.Expect != "" { + // Set read timeout + conn.SetReadDeadline(time.Now().Add(time.Duration(t.ReadTimeout) * time.Second)) + // Prepare reader + reader := bufio.NewReader(conn) + tp := textproto.NewReader(reader) + // Read + data, err := tp.ReadLine() + // Stop timer + responseTime = time.Since(start).Seconds() + // Handle error + if err != nil { + fields["string_found"] = false + } else { + // Looking for string in answer + RegEx := regexp.MustCompile(`.*` + t.Expect + `.*`) + find := RegEx.FindString(string(data)) + if find != "" { + fields["string_found"] = true + } else { + fields["string_found"] = false + } + } + + } + fields["response_time"] = responseTime + return fields, nil +} + +func (u *Connection) UdpGather() (map[string]interface{}, error) { + // Prepare fields + fields := make(map[string]interface{}) + // Start Timer + start := time.Now() + // Resolving + udpAddr, err := net.ResolveUDPAddr("udp", u.Address) + LocalAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0") + // Connecting + conn, err := net.DialUDP("udp", LocalAddr, udpAddr) + defer conn.Close() + // Handle error + if err != nil { + return nil, err + } + // Send string + msg := []byte(u.Send) + conn.Write(msg) + // Read string + // Set read timeout + conn.SetReadDeadline(time.Now().Add(time.Duration(u.ReadTimeout) * time.Second)) + // Read + buf := make([]byte, 1024) + _, _, err = conn.ReadFromUDP(buf) + // Stop timer + responseTime := time.Since(start).Seconds() + // Handle error + if err != nil { + return nil, err + } else { + // Looking for string in answer + RegEx := regexp.MustCompile(`.*` + u.Expect + `.*`) + find := RegEx.FindString(string(buf)) + if find != "" { + fields["string_found"] = true + } else { + fields["string_found"] = false + } + } + fields["response_time"] = responseTime + return fields, nil +} + +func (c *Connection) Gather(acc telegraf.Accumulator) error { + // Set default values + if c.Timeout == 0 { + c.Timeout = 1.0 + } + if c.ReadTimeout == 0 { + c.ReadTimeout = 1.0 + } + // Check send and expected string + if c.Protocol == "udp" && c.Send == "" { + return errors.New("Send string cannot be empty") + } + if c.Protocol == "udp" && c.Expect == "" { + return errors.New("Expected string cannot be empty") + } + // Prepare host and port + host, port, err := net.SplitHostPort(c.Address) + if err != nil { + return err + } + if host == "" { + c.Address = "localhost:" + port + } + if port == "" { + return errors.New("Bad port") + } + // Prepare data + tags := map[string]string{"host": host, "port": port} + var fields map[string]interface{} + // Gather data + if c.Protocol == "tcp" { + fields, err = c.TcpGather() + tags["protocol"] = "tcp" + } else if c.Protocol == "udp" { + fields, err = c.UdpGather() + tags["protocol"] = "udp" + } else { + return errors.New("Bad protocol") + } + if err != nil { + return err + } + // Add metrics + acc.AddFields("connection", fields, tags) + return nil +} + +func init() { + inputs.Add("connection", func() telegraf.Input { + return &Connection{} + }) +} diff --git a/plugins/inputs/connection/connection_test.go b/plugins/inputs/connection/connection_test.go new file mode 100644 index 0000000000000..c01c3daaac632 --- /dev/null +++ b/plugins/inputs/connection/connection_test.go @@ -0,0 +1,198 @@ +package connection + +import ( + "net" + "regexp" + "sync" + "testing" + + "github.com/influxdata/telegraf/testutil" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestBadProtocol(t *testing.T) { + var acc testutil.Accumulator + // Init plugin + c := Connection{ + Protocol: "unknownprotocol", + Address: ":9999", + } + // Error + err1 := c.Gather(&acc) + require.Error(t, err1) + assert.Equal(t, "Bad protocol", err1.Error()) +} + +func TestTCPError(t *testing.T) { + var acc testutil.Accumulator + // Init plugin + c := Connection{ + Protocol: "tcp", + Address: ":9999", + } + // Error + err1 := c.Gather(&acc) + require.Error(t, err1) + assert.Equal(t, "dial tcp 127.0.0.1:9999: getsockopt: connection refused", err1.Error()) +} + +func TestTCPOK1(t *testing.T) { + var wg sync.WaitGroup + var acc testutil.Accumulator + // Init plugin + c := Connection{ + Address: "127.0.0.1:2004", + Send: "test", + Expect: "test", + ReadTimeout: 3.0, + Timeout: 1.0, + Protocol: "tcp", + } + // Start TCP server + wg.Add(1) + go TCPServer(t, &wg) + wg.Wait() + // Connect + wg.Add(1) + err1 := c.Gather(&acc) + wg.Wait() + // Override response time + for _, p := range acc.Metrics { + p.Fields["response_time"] = 1.0 + } + require.NoError(t, err1) + acc.AssertContainsTaggedFields(t, + "connection", + map[string]interface{}{ + "string_found": true, + "response_time": 1.0, + }, + map[string]string{"host": "127.0.0.1", + "port": "2004", + "protocol": "tcp", + }, + ) + // Waiting TCPserver + wg.Wait() +} + +func TestTCPOK2(t *testing.T) { + var wg sync.WaitGroup + var acc testutil.Accumulator + // Init plugin + c := Connection{ + Address: "127.0.0.1:2004", + Send: "test", + Expect: "test2", + ReadTimeout: 3.0, + Timeout: 1.0, + Protocol: "tcp", + } + // Start TCP server + wg.Add(1) + go TCPServer(t, &wg) + wg.Wait() + // Connect + wg.Add(1) + err1 := c.Gather(&acc) + wg.Wait() + // Override response time + for _, p := range acc.Metrics { + p.Fields["response_time"] = 1.0 + } + require.NoError(t, err1) + acc.AssertContainsTaggedFields(t, + "connection", + map[string]interface{}{ + "string_found": false, + "response_time": 1.0, + }, + map[string]string{"host": "127.0.0.1", + "port": "2004", + "protocol": "tcp", + }, + ) + // Waiting TCPserver + wg.Wait() +} + +func TestUDPrror(t *testing.T) { + var acc testutil.Accumulator + // Init plugin + c := Connection{ + Address: ":9999", + Send: "test", + Expect: "test", + Protocol: "udp", + } + // Error + err1 := c.Gather(&acc) + require.Error(t, err1) + assert.Regexp(t, regexp.MustCompile(`read udp 127.0.0.1:[0-9]*->127.0.0.1:9999: recvfrom: connection refused`), err1.Error()) +} + +func TestUDPOK1(t *testing.T) { + var wg sync.WaitGroup + var acc testutil.Accumulator + // Init plugin + c := Connection{ + Address: "127.0.0.1:2004", + Send: "test", + Expect: "test", + ReadTimeout: 3.0, + Timeout: 1.0, + Protocol: "udp", + } + // Start UDP server + wg.Add(1) + go UDPServer(t, &wg) + wg.Wait() + // Connect + wg.Add(1) + err1 := c.Gather(&acc) + wg.Wait() + // Override response time + for _, p := range acc.Metrics { + p.Fields["response_time"] = 1.0 + } + require.NoError(t, err1) + acc.AssertContainsTaggedFields(t, + "connection", + map[string]interface{}{ + "string_found": true, + "response_time": 1.0, + }, + map[string]string{"host": "127.0.0.1", + "port": "2004", + "protocol": "udp", + }, + ) + // Waiting TCPserver + wg.Wait() +} + +func UDPServer(t *testing.T, wg *sync.WaitGroup) { + udpAddr, _ := net.ResolveUDPAddr("udp", "127.0.0.1:2004") + conn, _ := net.ListenUDP("udp", udpAddr) + wg.Done() + buf := make([]byte, 1024) + _, remoteaddr, _ := conn.ReadFromUDP(buf) + conn.WriteToUDP(buf, remoteaddr) + conn.Close() + wg.Done() +} + +func TCPServer(t *testing.T, wg *sync.WaitGroup) { + tcpAddr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:2004") + tcpServer, _ := net.ListenTCP("tcp", tcpAddr) + wg.Done() + conn, _ := tcpServer.AcceptTCP() + buf := make([]byte, 1024) + conn.Read(buf) + conn.Write(buf) + conn.CloseWrite() + tcpServer.Close() + wg.Done() +}