Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow TCP helper to support delimiters #8278

Merged
merged 2 commits into from
Sep 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff]
- Add experimental socket summary metricset to system module {pull}6782[6782]
- Increase ignore_above for system.process.cmdline to 2048. {pull}8101[8100]
- Add support to renamed fields planned for redis 5.0. {pull}8167[8167]
- Allow TCP helper to support delimiters. {pull}8278[8278]

*Packetbeat*

Expand Down
15 changes: 14 additions & 1 deletion metricbeat/helper/server/tcp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,29 @@

package tcp

import "fmt"

type TcpConfig struct {
Host string `config:"host"`
Port int `config:"port"`
ReceiveBufferSize int `config:"receive_buffer_size"`
Delimiter string `config:"delimiter"`
}

func defaultTcpConfig() TcpConfig {
return TcpConfig{
Host: "localhost",
Port: 2003,
ReceiveBufferSize: 1024,
ReceiveBufferSize: 4096,
Delimiter: "\n",
}
}

// Validate ensures that the configured delimiter has only one character
func (t *TcpConfig) Validate() error {
if len(t.Delimiter) != 1 {
return fmt.Errorf("length of delimiter is expected to be 1 but is %v", len(t.Delimiter))
}

return nil
}
52 changes: 39 additions & 13 deletions metricbeat/helper/server/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package tcp

import (
"bufio"
"fmt"
"net"

Expand All @@ -35,6 +36,7 @@ type TcpServer struct {
receiveBufferSize int
done chan struct{}
eventQueue chan server.Event
delimiter byte
}

type TcpEvent struct {
Expand Down Expand Up @@ -67,6 +69,7 @@ func NewTcpServer(base mb.BaseMetricSet) (server.Server, error) {
receiveBufferSize: config.ReceiveBufferSize,
done: make(chan struct{}),
eventQueue: make(chan server.Event),
delimiter: byte(config.Delimiter[0]),
}, nil
}

Expand All @@ -83,7 +86,6 @@ func (g *TcpServer) Start() error {
}

func (g *TcpServer) watchMetrics() {
buffer := make([]byte, g.receiveBufferSize)
for {
select {
case <-g.done:
Expand All @@ -96,22 +98,46 @@ func (g *TcpServer) watchMetrics() {
logp.Err("Unable to accept connection due to error: %v", err)
continue
}
defer func() {
if conn != nil {
conn.Close()
}
}()

length, err := conn.Read(buffer)
if conn != nil {
go g.handle(conn)
}
}
}

func (g *TcpServer) handle(conn net.Conn) {
logp.Debug("tcp", "Handling new connection...")

// Close connection when this function ends
defer func() {
conn.Close()
}()

// Get a new reader with buffer size as the same as receiveBufferSize
bufReader := bufio.NewReaderSize(conn, g.receiveBufferSize)

for {
// Read tokens delimited by delimiter
bytes, err := bufReader.ReadBytes(g.delimiter)
jsoriano marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
logp.Err("Error reading from buffer: %v", err.Error())
continue
logp.Debug("tcp", "unable to read bytes due to error: %v", err)
return
}

// Truncate to max buffer size if too big of a payload
if len(bytes) > g.receiveBufferSize {
bytes = bytes[:g.receiveBufferSize]
}
g.eventQueue <- &TcpEvent{
event: common.MapStr{
server.EventDataKey: buffer[:length],
},

// Drop the delimiter and send the data
if len(bytes) > 0 {
g.eventQueue <- &TcpEvent{
event: common.MapStr{
server.EventDataKey: bytes[:len(bytes)-1],
},
}
}

}
}

Expand Down
3 changes: 2 additions & 1 deletion metricbeat/helper/server/tcp/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func GetTestTcpServer(host string, port int) (server.Server, error) {
receiveBufferSize: 1024,
done: make(chan struct{}),
eventQueue: make(chan server.Event),
delimiter: '\n',
}, nil
}

Expand All @@ -62,7 +63,7 @@ func TestTcpServer(t *testing.T) {
}

defer svc.Stop()
writeToServer(t, "test1", host, port)
writeToServer(t, "test1\n", host, port)
msg := <-svc.GetEvents()

assert.True(t, msg.GetEvent() != nil)
Expand Down