From 0a9d9787b9c1aff86e092042fdad89179ceec133 Mon Sep 17 00:00:00 2001 From: Vijay Samuel Date: Mon, 10 Sep 2018 16:39:33 -0700 Subject: [PATCH 1/2] Allow TCP helper to support delimiters --- CHANGELOG.asciidoc | 1 + metricbeat/helper/server/tcp/config.go | 15 +++++++- metricbeat/helper/server/tcp/tcp.go | 52 +++++++++++++++++++------- 3 files changed, 54 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 18c3c7c0eeb..8527f5a895a 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -125,6 +125,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff] - Add experimental socket summary metricset to system module {pull}6782[6782] - Increase ignore_above for system.process.cmdline to 2048. {pull}8101[8100] - Add support to renamed fields planned for redis 5.0. {pull}8167[8167] +- Allow TCP helper to support delimiters. {pull}8278[8278] *Packetbeat* diff --git a/metricbeat/helper/server/tcp/config.go b/metricbeat/helper/server/tcp/config.go index 88c039706a2..def34d1502b 100644 --- a/metricbeat/helper/server/tcp/config.go +++ b/metricbeat/helper/server/tcp/config.go @@ -17,16 +17,29 @@ package tcp +import "fmt" + type TcpConfig struct { Host string `config:"host"` Port int `config:"port"` ReceiveBufferSize int `config:"receive_buffer_size"` + Delimiter string `config:"delimiter"` } func defaultTcpConfig() TcpConfig { return TcpConfig{ Host: "localhost", Port: 2003, - ReceiveBufferSize: 1024, + ReceiveBufferSize: 4096, + Delimiter: "\n", } } + +// Validate ensures that the configured delimiter has only one character +func (t *TcpConfig) Validate() error { + if len(t.Delimiter) != 1 { + return fmt.Errorf("length of delimiter is expected to be 1 but is %v", len(t.Delimiter)) + } + + return nil +} diff --git a/metricbeat/helper/server/tcp/tcp.go b/metricbeat/helper/server/tcp/tcp.go index 7b8529d7023..2b03b690947 100644 --- a/metricbeat/helper/server/tcp/tcp.go +++ b/metricbeat/helper/server/tcp/tcp.go @@ -18,6 +18,7 @@ package tcp import ( + "bufio" "fmt" "net" @@ -35,6 +36,7 @@ type TcpServer struct { receiveBufferSize int done chan struct{} eventQueue chan server.Event + delimiter byte } type TcpEvent struct { @@ -67,6 +69,7 @@ func NewTcpServer(base mb.BaseMetricSet) (server.Server, error) { receiveBufferSize: config.ReceiveBufferSize, done: make(chan struct{}), eventQueue: make(chan server.Event), + delimiter: byte(config.Delimiter[0]), }, nil } @@ -83,7 +86,6 @@ func (g *TcpServer) Start() error { } func (g *TcpServer) watchMetrics() { - buffer := make([]byte, g.receiveBufferSize) for { select { case <-g.done: @@ -96,22 +98,46 @@ func (g *TcpServer) watchMetrics() { logp.Err("Unable to accept connection due to error: %v", err) continue } - defer func() { - if conn != nil { - conn.Close() - } - }() - length, err := conn.Read(buffer) + go g.handle(conn) + } +} + +func (g *TcpServer) handle(conn net.Conn) { + logp.Debug("tcp", "Handling new connection...") + + // Close connection when this function ends + defer func() { + if conn != nil { + conn.Close() + } + }() + + // Get a new reader with buffer size as the same as receiveBufferSize + bufReader := bufio.NewReaderSize(conn, g.receiveBufferSize) + + for { + // Read tokens delimited by delimiter + bytes, err := bufReader.ReadBytes(g.delimiter) if err != nil { - logp.Err("Error reading from buffer: %v", err.Error()) - continue + logp.Debug("tcp", "unable to read bytes due to error: %v", err) + return + } + + // Truncate to max buffer size if too big of a payload + if len(bytes) > g.receiveBufferSize { + bytes = bytes[:g.receiveBufferSize] } - g.eventQueue <- &TcpEvent{ - event: common.MapStr{ - server.EventDataKey: buffer[:length], - }, + + // Drop the delimiter and send the data + if len(bytes) > 0 { + g.eventQueue <- &TcpEvent{ + event: common.MapStr{ + server.EventDataKey: bytes[:len(bytes)-1], + }, + } } + } } From 04378a933c684ae860858fc5c70f8eab0db1a7b7 Mon Sep 17 00:00:00 2001 From: Vijay Samuel Date: Tue, 11 Sep 2018 20:01:23 -0700 Subject: [PATCH 2/2] Incorporating review comments --- metricbeat/helper/server/tcp/tcp.go | 8 ++++---- metricbeat/helper/server/tcp/tcp_test.go | 3 ++- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/metricbeat/helper/server/tcp/tcp.go b/metricbeat/helper/server/tcp/tcp.go index 2b03b690947..d8a2a48b2f5 100644 --- a/metricbeat/helper/server/tcp/tcp.go +++ b/metricbeat/helper/server/tcp/tcp.go @@ -99,7 +99,9 @@ func (g *TcpServer) watchMetrics() { continue } - go g.handle(conn) + if conn != nil { + go g.handle(conn) + } } } @@ -108,9 +110,7 @@ func (g *TcpServer) handle(conn net.Conn) { // Close connection when this function ends defer func() { - if conn != nil { - conn.Close() - } + conn.Close() }() // Get a new reader with buffer size as the same as receiveBufferSize diff --git a/metricbeat/helper/server/tcp/tcp_test.go b/metricbeat/helper/server/tcp/tcp_test.go index 018ef0d1f71..27ad81a060f 100644 --- a/metricbeat/helper/server/tcp/tcp_test.go +++ b/metricbeat/helper/server/tcp/tcp_test.go @@ -43,6 +43,7 @@ func GetTestTcpServer(host string, port int) (server.Server, error) { receiveBufferSize: 1024, done: make(chan struct{}), eventQueue: make(chan server.Event), + delimiter: '\n', }, nil } @@ -62,7 +63,7 @@ func TestTcpServer(t *testing.T) { } defer svc.Stop() - writeToServer(t, "test1", host, port) + writeToServer(t, "test1\n", host, port) msg := <-svc.GetEvents() assert.True(t, msg.GetEvent() != nil)