Skip to content

Commit

Permalink
Feature: TCP Input
Browse files Browse the repository at this point in the history
Allow to receive new events via TCP, this will create a new event per
line and add some useful information about the connected client to the
evant. This input is marked as **experimental**.

This input expose the following settings:

- `line_delimiter`: The characters used to split incoming events, by
default it will split on `\n` and will also strip both `\n` and `\r`.
You can also use any characters like `;` and you can also used multiple
characters delimiter like `<END>`, the delimiter tokens will always be
removed from the string.

- `max_message_size`: This is a number of bytes that a client can buffer
in memory before finding a new message, if the limit is reached the
connection is killed and the event is logged. This is to prevent rogue
clients to DoS attack by consuming all the available memory. The default
values is 20971520.

- `timeout`: The server will close any client that reach this inactivity
timeout.

Ref: elastic#5862
  • Loading branch information
ph committed Mar 27, 2018
1 parent 88998f9 commit 80133fb
Show file tree
Hide file tree
Showing 14 changed files with 971 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
the system test. {pull}6121[6121]
- Add IIS module to parse access log and error log. {pull}6127[6127]
- Remove the undefined `username` option from the Redis input and clarify the documentation. {pull}6662[6662]
- Refactor the usage of prospector to input in the YAML reference {pull}6121[6121]
- Addition of the TCP input {pull}6266[6266]

*Heartbeat*

Expand Down
14 changes: 14 additions & 0 deletions filebeat/_meta/common.reference.p2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,20 @@ filebeat.inputs:
# Maximum size of the message received over UDP
#max_message_size: 10240

#------------------------------ TCP prospector --------------------------------
# Experimental: Config options for the TCP input
#- type: tcp
#enabled: false

# The host and port to receive the new event
#host: "localhost:9000"

# Character used to split new message
#line_delimiter: "\n"

# Maximum size in bytes of the message received over TCP
#max_message_size: 20971520

#========================== Filebeat autodiscover ==============================

# Autodiscover allows you to detect changes in the system and spawn new modules
Expand Down
14 changes: 14 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,20 @@ filebeat.inputs:
# Maximum size of the message received over UDP
#max_message_size: 10240

#------------------------------ TCP prospector --------------------------------
# Experimental: Config options for the TCP input
#- type: tcp
#enabled: false

# The host and port to receive the new event
#host: "localhost:9000"

# Character used to split new message
#line_delimiter: "\n"

# Maximum size in bytes of the message received over TCP
#max_message_size: 20971520

#========================== Filebeat autodiscover ==============================

# Autodiscover allows you to detect changes in the system and spawn new modules
Expand Down
1 change: 1 addition & 0 deletions filebeat/include/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ import (
_ "github.com/elastic/beats/filebeat/input/log"
_ "github.com/elastic/beats/filebeat/input/redis"
_ "github.com/elastic/beats/filebeat/input/stdin"
_ "github.com/elastic/beats/filebeat/input/tcp"
_ "github.com/elastic/beats/filebeat/input/udp"
)
115 changes: 115 additions & 0 deletions filebeat/input/tcp/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package tcp

import (
"bufio"
"net"
"strings"
"time"

"github.com/pkg/errors"

"github.com/elastic/beats/filebeat/harvester"
"github.com/elastic/beats/filebeat/util"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)

// Client is a remote client.
type Client struct {
conn net.Conn
forwarder *harvester.Forwarder
done chan struct{}
metadata common.MapStr
splitFunc bufio.SplitFunc
maxReadMessage int64
timeout time.Duration
}

// NewClient returns a new client instance for the remote connection.
func NewClient(
conn net.Conn,
forwarder *harvester.Forwarder,
splitFunc bufio.SplitFunc,
maxReadMessage int64,
timeout time.Duration,
) *Client {
client := &Client{
conn: conn,
forwarder: forwarder,
done: make(chan struct{}),
splitFunc: splitFunc,
maxReadMessage: maxReadMessage,
timeout: timeout,
}
client.cacheMetadata()
return client
}

// Handle is reading message from the specified TCP socket.
func (c *Client) Handle() error {
r := NewMeteredReader(NewDeadlineReader(c.conn, c.timeout), c.maxReadMessage)
buf := bufio.NewReader(r)
scanner := bufio.NewScanner(buf)
scanner.Split(c.splitFunc)

for scanner.Scan() {
if scanner.Err() != nil {
// we are forcing a close on the socket, lets ignore any error that could happen.
select {
case <-c.done:
break
default:
}
// This is a user defined limit and we should notify the user.
if IsMaxReadBufferErr(scanner.Err()) {
logp.Err("tcp client error: %s", scanner.Err())
}
return errors.Wrap(scanner.Err(), "tcp client error")
}
r.Reset()
c.forwarder.Send(c.createEvent(scanner.Text()))
}
return nil
}

// Close stops reading from the socket and close the connection.
func (c *Client) Close() {
close(c.done)
c.conn.Close()
}

func (c *Client) createEvent(rawString string) *util.Data {
data := util.NewData()
data.Event = beat.Event{
Timestamp: time.Now(),
Meta: c.metadata,
Fields: common.MapStr{
"message": rawString,
},
}
return data
}

// GetRemoteHosts take the IP address of the client and try to resolve the name, if it fails we
// fallback to the IP, IP can resolve to multiple hostname.
func (c *Client) getRemoteHosts() []string {
ip := c.conn.RemoteAddr().String()
idx := strings.Index(ip, ":")
if idx == -1 {
return []string{ip}
}
ip = ip[0:idx]
hosts, err := net.LookupAddr(ip)
if err != nil {
hosts = []string{ip}
}
return hosts
}

func (c *Client) cacheMetadata() {
c.metadata = common.MapStr{
"hostnames": c.getRemoteHosts(),
"ip_address": c.conn.RemoteAddr().String(),
}
}
25 changes: 25 additions & 0 deletions filebeat/input/tcp/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package tcp

import (
"time"

"github.com/elastic/beats/filebeat/harvester"
)

type config struct {
harvester.ForwarderConfig `config:",inline"`
Host string `config:"host"`
LineDelimiter string `config:"line_delimiter" validate:"nonzero"`
Timeout time.Duration `config:"timeout" validate:"nonzero,positive"`
MaxMessageSize int64 `config:"max_message_size" validate:"nonzero,positive"`
}

var defaultConfig = config{
ForwarderConfig: harvester.ForwarderConfig{
Type: "tcp",
},
LineDelimiter: "\n",
Host: "localhost:9000",
Timeout: time.Second * 5 * 60,
MaxMessageSize: 20 * 1024 * 1024,
}
72 changes: 72 additions & 0 deletions filebeat/input/tcp/conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package tcp

import (
"io"
"net"
"time"

"github.com/pkg/errors"
)

// ErrMaxReadBuffer returns when too many bytes was read on the io.Reader
var ErrMaxReadBuffer = errors.New("max read buffer reached")

// MeteredReader is based on LimitedReader but allow to reset the byte read and return a specific
// error when we reach the limit.
type MeteredReader struct {
reader io.Reader
maxReadBuffer int64
byteRead int64
}

// NewMeteredReader returns a new MeteredReader
func NewMeteredReader(reader io.Reader, maxReadBuffer int64) *MeteredReader {
return &MeteredReader{
reader: reader,
maxReadBuffer: maxReadBuffer,
}
}

// Read reads the specified amount of byte
func (m *MeteredReader) Read(p []byte) (n int, err error) {
if m.byteRead >= m.maxReadBuffer {
return 0, ErrMaxReadBuffer
}
n, err = m.reader.Read(p)
m.byteRead += int64(n)
return
}

// Reset resets the number of byte read
func (m *MeteredReader) Reset() {
m.byteRead = 0
}

// IsMaxReadBufferErr returns true when the error is ErrMaxReadBuffer
func IsMaxReadBufferErr(err error) bool {
return err == ErrMaxReadBuffer
}

// DeadlineReader allow read to a io.Reader to timeout, the timeout is refreshed on every read.
type DeadlineReader struct {
conn net.Conn
timeout time.Duration
}

// NewDeadlineReader returns a new DeadlineReader
func NewDeadlineReader(c net.Conn, timeout time.Duration) *DeadlineReader {
return &DeadlineReader{
conn: c,
timeout: timeout,
}
}

// Read reads the number of bytes from the reader
func (d *DeadlineReader) Read(p []byte) (n int, err error) {
d.refresh()
return d.conn.Read(p)
}

func (d *DeadlineReader) refresh() {
d.conn.SetDeadline(time.Now().Add(d.timeout))
}
43 changes: 43 additions & 0 deletions filebeat/input/tcp/conn_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package tcp

import (
"strings"
"testing"

"github.com/stretchr/testify/assert"
)

func TestMeteredReader(t *testing.T) {
maxReadBuffer := 400

t.Run("WhenMaxReadIsReachedInMultipleRead", func(t *testing.T) {
r := strings.NewReader(randomString(maxReadBuffer * 2))
m := NewMeteredReader(r, int64(maxReadBuffer))
toRead := make([]byte, maxReadBuffer)
_, err := m.Read(toRead)
assert.NoError(t, err)
toRead = make([]byte, 300)
_, err = m.Read(toRead)
assert.Equal(t, ErrMaxReadBuffer, err)
})

t.Run("WhenMaxReadIsNotReached", func(t *testing.T) {
r := strings.NewReader(randomString(maxReadBuffer * 2))
m := NewMeteredReader(r, int64(maxReadBuffer))
toRead := make([]byte, maxReadBuffer)
_, err := m.Read(toRead)
assert.NoError(t, err)
})

t.Run("WhenResetIsCalled", func(t *testing.T) {
r := strings.NewReader(randomString(maxReadBuffer * 2))
m := NewMeteredReader(r, int64(maxReadBuffer))
toRead := make([]byte, maxReadBuffer)
_, err := m.Read(toRead)
assert.NoError(t, err)
m.Reset()
toRead = make([]byte, 300)
_, err = m.Read(toRead)
assert.NoError(t, err)
})
}
Loading

0 comments on commit 80133fb

Please sign in to comment.