From 6842a4ab650057b942651ba123f987e80720a9e4 Mon Sep 17 00:00:00 2001 From: ph Date: Fri, 26 Jan 2018 10:09:05 -0500 Subject: [PATCH 1/3] Feature: TCP Input Allow to receive new events via TCP, this will create a new event per line and add some useful information about the connected client to the evant. This input is marked as **experimental**. This input expose the following settings: - `line_delimiter`: The characters used to split incoming events, by default it will split on `\n` and will also strip both `\n` and `\r`. You can also use any characters like `;` and you can also used multiple characters delimiter like ``, the delimiter tokens will always be removed from the string. - `max_message_size`: This is a number of bytes that a client can buffer in memory before finding a new message, if the limit is reached the connection is killed and the event is logged. This is to prevent rogue clients to DoS attack by consuming all the available memory. The default values is 20971520. - `timeout`: The server will close any client that reach this inactivity timeout. Ref: #5862 --- CHANGELOG.asciidoc | 1 + filebeat/_meta/common.reference.p2.yml | 17 ++ filebeat/docs/filebeat-options.asciidoc | 5 +- filebeat/docs/inputs/input-tcp.asciidoc | 55 ++++++ filebeat/filebeat.reference.yml | 17 ++ filebeat/include/list.go | 1 + filebeat/input/tcp/config.go | 26 +++ filebeat/input/tcp/input.go | 109 +++++++++++ filebeat/input/tcp/input_test.go | 30 +++ filebeat/inputsource/tcp/client.go | 78 ++++++++ filebeat/inputsource/tcp/config.go | 36 ++++ filebeat/inputsource/tcp/conn.go | 72 +++++++ filebeat/inputsource/tcp/conn_test.go | 43 +++++ filebeat/inputsource/tcp/scan.go | 31 +++ filebeat/inputsource/tcp/scan_test.go | 91 +++++++++ filebeat/inputsource/tcp/server.go | 167 ++++++++++++++++ filebeat/inputsource/tcp/server_test.go | 244 ++++++++++++++++++++++++ filebeat/tests/system/test_tcp.py | 68 +++++++ 18 files changed, 1090 insertions(+), 1 deletion(-) create mode 100644 filebeat/docs/inputs/input-tcp.asciidoc create mode 100644 filebeat/input/tcp/config.go create mode 100644 filebeat/input/tcp/input.go create mode 100644 filebeat/input/tcp/input_test.go create mode 100644 filebeat/inputsource/tcp/client.go create mode 100644 filebeat/inputsource/tcp/config.go create mode 100644 filebeat/inputsource/tcp/conn.go create mode 100644 filebeat/inputsource/tcp/conn_test.go create mode 100644 filebeat/inputsource/tcp/scan.go create mode 100644 filebeat/inputsource/tcp/scan_test.go create mode 100644 filebeat/inputsource/tcp/server.go create mode 100644 filebeat/inputsource/tcp/server_test.go create mode 100644 filebeat/tests/system/test_tcp.py diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 5257a31b9e9..5f2b3eb6eef 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -52,6 +52,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di - Remove the undefined `username` option from the Redis input and clarify the documentation. {pull}6662[6662] - Add validation for Stdin, when Filebeat is configured with Stdin and any other inputs, Filebeat will now refuses to start. {pull}6463[6463] +- Addition of the TCP input {pull}6700[6700] *Heartbeat* diff --git a/filebeat/_meta/common.reference.p2.yml b/filebeat/_meta/common.reference.p2.yml index 4e922d48a92..91e66d01300 100644 --- a/filebeat/_meta/common.reference.p2.yml +++ b/filebeat/_meta/common.reference.p2.yml @@ -240,6 +240,23 @@ filebeat.inputs: # Maximum size of the message received over UDP #max_message_size: 10240 +#------------------------------ TCP prospector -------------------------------- +# Experimental: Config options for the TCP input +#- type: tcp + #enabled: false + + # The host and port to receive the new event + #host: "localhost:9000" + + # Character used to split new message + #line_delimiter: "\n" + + # Maximum size in bytes of the message received over TCP + #max_message_size: 20MiB + + # The number of seconds of inactivity before a remote connection is closed. + #timeout: 300s + #========================== Filebeat autodiscover ============================== # Autodiscover allows you to detect changes in the system and spawn new modules diff --git a/filebeat/docs/filebeat-options.asciidoc b/filebeat/docs/filebeat-options.asciidoc index 6b1ace6c394..ba2109e7e33 100644 --- a/filebeat/docs/filebeat-options.asciidoc +++ b/filebeat/docs/filebeat-options.asciidoc @@ -14,7 +14,7 @@ and configuring modules. To configure {beatname_uc} manually (instead of using <<{beatname_lc}-modules-overview,modules>>), you specify a list of inputs in the +{beatname_lc}.inputs+ section of the +{beatname_lc}.yml+. Inputs specify how -{beatname_uc} locates and processes input data. +{beatname_uc} locates and processes input data. The list is a http://yaml.org/[YAML] array, so each input begins with a dash (`-`). You can specify multiple inputs, and you can specify the same @@ -47,6 +47,7 @@ You can configure {beatname_uc} to use the following inputs: * <<{beatname_lc}-input-redis>> * <<{beatname_lc}-input-udp>> * <<{beatname_lc}-input-docker>> +* <<{beatname_lc}-input-tcp>> @@ -59,3 +60,5 @@ include::inputs/input-redis.asciidoc[] include::inputs/input-udp.asciidoc[] include::inputs/input-docker.asciidoc[] + +include::inputs/input-tcp.asciidoc[] diff --git a/filebeat/docs/inputs/input-tcp.asciidoc b/filebeat/docs/inputs/input-tcp.asciidoc new file mode 100644 index 00000000000..685bee40661 --- /dev/null +++ b/filebeat/docs/inputs/input-tcp.asciidoc @@ -0,0 +1,55 @@ +:type: tcp + +[id="{beatname_lc}-input-{type}"] +=== TCP input + +++++ +TCP +++++ + +Use the `TCP` input to read events over TCP. + +Example configuration: + +["source","yaml",subs="attributes"] +---- +{beatname_lc}.inputs: +- type: tcp + max_message_size: 10240 + host: "localhost:9000" +---- + + +==== Configuration options + +The `tcp` input supports the following configuration options plus the +<<{beatname_lc}-input-{type}-common-options>> described later. + +[float] +[id="{beatname_lc}-input-{type}-max-message-size"] +==== `max_message_size` + +The maximum size of the message received over TCP. The default is `20MiB`. + +[float] +[id="{beatname_lc}-input-{type}-host"] +==== `host` + +The host and TCP port to listen on for event streams. + +[float] +[id="{beatname_lc}-input-{type}-line-delimiter"] +==== `line_delimiter` + +Specify the characters used to split the incoming events. The default is '\n'. + +[float] +[id="{beatname_lc}-input-{type}-timeout"] +==== `timeout` + +The number of seconds of inactivity before a remote connection is closed. The default is `300s`. + +[id="{beatname_lc}-input-{type}-common-options"] +include::../inputs/input-common-options.asciidoc[] + +:type!: diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index 664dcea51be..15506ec236b 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -535,6 +535,23 @@ filebeat.inputs: # Maximum size of the message received over UDP #max_message_size: 10240 +#------------------------------ TCP prospector -------------------------------- +# Experimental: Config options for the TCP input +#- type: tcp + #enabled: false + + # The host and port to receive the new event + #host: "localhost:9000" + + # Character used to split new message + #line_delimiter: "\n" + + # Maximum size in bytes of the message received over TCP + #max_message_size: 20MiB + + # The number of seconds of inactivity before a remote connection is closed. + #timeout: 300s + #========================== Filebeat autodiscover ============================== # Autodiscover allows you to detect changes in the system and spawn new modules diff --git a/filebeat/include/list.go b/filebeat/include/list.go index e1aae4e763a..374be396ece 100644 --- a/filebeat/include/list.go +++ b/filebeat/include/list.go @@ -12,5 +12,6 @@ import ( _ "github.com/elastic/beats/filebeat/input/log" _ "github.com/elastic/beats/filebeat/input/redis" _ "github.com/elastic/beats/filebeat/input/stdin" + _ "github.com/elastic/beats/filebeat/input/tcp" _ "github.com/elastic/beats/filebeat/input/udp" ) diff --git a/filebeat/input/tcp/config.go b/filebeat/input/tcp/config.go new file mode 100644 index 00000000000..7b30b2a9640 --- /dev/null +++ b/filebeat/input/tcp/config.go @@ -0,0 +1,26 @@ +package tcp + +import ( + "time" + + "github.com/dustin/go-humanize" + + "github.com/elastic/beats/filebeat/harvester" + "github.com/elastic/beats/filebeat/inputsource/tcp" +) + +type config struct { + tcp.Config `config:",inline"` + harvester.ForwarderConfig `config:",inline"` +} + +var defaultConfig = config{ + ForwarderConfig: harvester.ForwarderConfig{ + Type: "tcp", + }, + Config: tcp.Config{ + LineDelimiter: "\n", + Timeout: time.Minute * 5, + MaxMessageSize: 20 * humanize.MiByte, + }, +} diff --git a/filebeat/input/tcp/input.go b/filebeat/input/tcp/input.go new file mode 100644 index 00000000000..2153d4154a0 --- /dev/null +++ b/filebeat/input/tcp/input.go @@ -0,0 +1,109 @@ +package tcp + +import ( + "time" + + "github.com/elastic/beats/filebeat/channel" + "github.com/elastic/beats/filebeat/harvester" + "github.com/elastic/beats/filebeat/input" + "github.com/elastic/beats/filebeat/inputsource/tcp" + "github.com/elastic/beats/filebeat/util" + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/atomic" + "github.com/elastic/beats/libbeat/common/cfgwarn" + "github.com/elastic/beats/libbeat/logp" +) + +func init() { + err := input.Register("tcp", NewInput) + if err != nil { + panic(err) + } +} + +// Input for TCP connection +type Input struct { + server *tcp.Server + started atomic.Bool + outlet channel.Outleter + config *config + log *logp.Logger +} + +// NewInput creates a new TCP input +func NewInput( + cfg *common.Config, + outlet channel.Factory, + context input.Context, +) (input.Input, error) { + cfgwarn.Experimental("TCP input type is used") + + out, err := outlet(cfg, context.DynamicFields) + if err != nil { + return nil, err + } + + forwarder := harvester.NewForwarder(out) + + config := defaultConfig + err = cfg.Unpack(&config) + if err != nil { + return nil, err + } + + cb := func(data []byte, metadata tcp.Metadata) { + event := createEvent(data, metadata) + forwarder.Send(event) + } + + server, err := tcp.New(cb, &config.Config) + if err != nil { + return nil, err + } + + return &Input{ + server: server, + started: atomic.MakeBool(false), + outlet: out, + config: &config, + log: logp.NewLogger("tcp input").With(config.Config.Host), + }, nil +} + +// Run start a TCP input +func (p *Input) Run() { + if !p.started.Load() { + p.log.Info("Starting TCP input") + err := p.server.Start() + if err != nil { + p.log.Errorw("Error starting the TCP server", "error", err) + } + p.started.Swap(true) + } +} + +// Stop stops TCP server +func (p *Input) Stop() { + p.log.Info("Stopping TCP input") + defer p.outlet.Close() + defer p.started.Swap(false) + p.server.Stop() +} + +// Wait stop the current server +func (p *Input) Wait() { + p.Stop() +} + +func createEvent(raw []byte, metadata tcp.Metadata) *util.Data { + data := util.NewData() + data.Event = beat.Event{ + Timestamp: time.Now(), + Fields: common.MapStr{ + "message": string(raw), + "source": metadata.RemoteAddr.String(), + }, + } + return data +} diff --git a/filebeat/input/tcp/input_test.go b/filebeat/input/tcp/input_test.go new file mode 100644 index 00000000000..80a32f7a9e5 --- /dev/null +++ b/filebeat/input/tcp/input_test.go @@ -0,0 +1,30 @@ +package tcp + +import ( + "net" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/filebeat/inputsource/tcp" +) + +func TestCreateEvent(t *testing.T) { + hello := "hello world" + ip := "127.0.0.1" + parsedIP := net.ParseIP(ip) + addr := &net.IPAddr{IP: parsedIP, Zone: ""} + + message := []byte(hello) + mt := tcp.Metadata{RemoteAddr: addr} + + data := createEvent(message, mt) + event := data.GetEvent() + + m, err := event.GetValue("message") + assert.NoError(t, err) + assert.Equal(t, string(message), m) + + from, _ := event.GetValue("source") + assert.Equal(t, ip, from) +} diff --git a/filebeat/inputsource/tcp/client.go b/filebeat/inputsource/tcp/client.go new file mode 100644 index 00000000000..b9c4528a1d2 --- /dev/null +++ b/filebeat/inputsource/tcp/client.go @@ -0,0 +1,78 @@ +package tcp + +import ( + "bufio" + "net" + "time" + + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/logp" +) + +// Client is a remote client. +type client struct { + conn net.Conn + log *logp.Logger + callback CallbackFunc + done chan struct{} + metadata Metadata + splitFunc bufio.SplitFunc + maxReadMessage size + timeout time.Duration +} + +func newClient( + conn net.Conn, + log *logp.Logger, + callback CallbackFunc, + splitFunc bufio.SplitFunc, + maxReadMessage size, + timeout time.Duration, +) *client { + client := &client{ + conn: conn, + log: log.With("address", conn.RemoteAddr()), + callback: callback, + done: make(chan struct{}), + splitFunc: splitFunc, + maxReadMessage: maxReadMessage, + timeout: timeout, + metadata: Metadata{ + RemoteAddr: conn.RemoteAddr(), + }, + } + return client +} + +func (c *client) handle() error { + r := NewResetableLimitedReader(NewDeadlineReader(c.conn, c.timeout), uint64(c.maxReadMessage)) + buf := bufio.NewReader(r) + scanner := bufio.NewScanner(buf) + scanner.Split(c.splitFunc) + + for scanner.Scan() { + err := scanner.Err() + if err != nil { + // we are forcing a close on the socket, lets ignore any error that could happen. + select { + case <-c.done: + break + default: + } + // This is a user defined limit and we should notify the user. + if IsMaxReadBufferErr(err) { + c.log.Errorw("client errors", "error", err) + } + return errors.Wrap(err, "tcp client error") + } + r.Reset() + c.callback(scanner.Bytes(), c.metadata) + } + return nil +} + +func (c *client) close() { + close(c.done) + c.conn.Close() +} diff --git a/filebeat/inputsource/tcp/config.go b/filebeat/inputsource/tcp/config.go new file mode 100644 index 00000000000..c4164dac8c5 --- /dev/null +++ b/filebeat/inputsource/tcp/config.go @@ -0,0 +1,36 @@ +package tcp + +import ( + "fmt" + "time" + + "github.com/dustin/go-humanize" +) + +type size uint64 + +// Config exposes the tcp configuration. +type Config struct { + Host string `config:"host"` + LineDelimiter string `config:"line_delimiter" validate:"nonzero"` + Timeout time.Duration `config:"timeout" validate:"nonzero,positive"` + MaxMessageSize size `config:"max_message_size" validate:"nonzero,positive"` +} + +// Validate validates the Config option for the tcp input. +func (c *Config) Validate() error { + if len(c.Host) == 0 { + return fmt.Errorf("need to specify the host using the `host:port` syntax") + } + return nil +} + +func (s *size) Unpack(value string) error { + sz, err := humanize.ParseBytes(value) + if err != nil { + return err + } + + *s = size(sz) + return nil +} diff --git a/filebeat/inputsource/tcp/conn.go b/filebeat/inputsource/tcp/conn.go new file mode 100644 index 00000000000..a2a40bb02f0 --- /dev/null +++ b/filebeat/inputsource/tcp/conn.go @@ -0,0 +1,72 @@ +package tcp + +import ( + "io" + "net" + "time" + + "github.com/pkg/errors" +) + +// ErrMaxReadBuffer returns when too many bytes was read on the io.Reader +var ErrMaxReadBuffer = errors.New("max read buffer reached") + +// ResetableLimitedReader is based on LimitedReader but allow to reset the byte read and return a specific +// error when we reach the limit. +type ResetableLimitedReader struct { + reader io.Reader + maxReadBuffer uint64 + byteRead uint64 +} + +// NewResetableLimitedReader returns a new ResetableLimitedReader +func NewResetableLimitedReader(reader io.Reader, maxReadBuffer uint64) *ResetableLimitedReader { + return &ResetableLimitedReader{ + reader: reader, + maxReadBuffer: maxReadBuffer, + } +} + +// Read reads the specified amount of byte +func (m *ResetableLimitedReader) Read(p []byte) (n int, err error) { + if m.byteRead >= m.maxReadBuffer { + return 0, ErrMaxReadBuffer + } + n, err = m.reader.Read(p) + m.byteRead += uint64(n) + return +} + +// Reset resets the number of byte read +func (m *ResetableLimitedReader) Reset() { + m.byteRead = 0 +} + +// IsMaxReadBufferErr returns true when the error is ErrMaxReadBuffer +func IsMaxReadBufferErr(err error) bool { + return err == ErrMaxReadBuffer +} + +// DeadlineReader allow read to a io.Reader to timeout, the timeout is refreshed on every read. +type DeadlineReader struct { + conn net.Conn + timeout time.Duration +} + +// NewDeadlineReader returns a new DeadlineReader +func NewDeadlineReader(c net.Conn, timeout time.Duration) *DeadlineReader { + return &DeadlineReader{ + conn: c, + timeout: timeout, + } +} + +// Read reads the number of bytes from the reader +func (d *DeadlineReader) Read(p []byte) (n int, err error) { + d.refresh() + return d.conn.Read(p) +} + +func (d *DeadlineReader) refresh() { + d.conn.SetDeadline(time.Now().Add(d.timeout)) +} diff --git a/filebeat/inputsource/tcp/conn_test.go b/filebeat/inputsource/tcp/conn_test.go new file mode 100644 index 00000000000..aaf2bea6a16 --- /dev/null +++ b/filebeat/inputsource/tcp/conn_test.go @@ -0,0 +1,43 @@ +package tcp + +import ( + "strings" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestResetableLimitedReader(t *testing.T) { + maxReadBuffer := 400 + + t.Run("WhenMaxReadIsReachedInMultipleRead", func(t *testing.T) { + r := strings.NewReader(randomString(maxReadBuffer * 2)) + m := NewResetableLimitedReader(r, uint64(maxReadBuffer)) + toRead := make([]byte, maxReadBuffer) + _, err := m.Read(toRead) + assert.NoError(t, err) + toRead = make([]byte, 300) + _, err = m.Read(toRead) + assert.Equal(t, ErrMaxReadBuffer, err) + }) + + t.Run("WhenMaxReadIsNotReached", func(t *testing.T) { + r := strings.NewReader(randomString(maxReadBuffer * 2)) + m := NewResetableLimitedReader(r, uint64(maxReadBuffer)) + toRead := make([]byte, maxReadBuffer) + _, err := m.Read(toRead) + assert.NoError(t, err) + }) + + t.Run("WhenResetIsCalled", func(t *testing.T) { + r := strings.NewReader(randomString(maxReadBuffer * 2)) + m := NewResetableLimitedReader(r, uint64(maxReadBuffer)) + toRead := make([]byte, maxReadBuffer) + _, err := m.Read(toRead) + assert.NoError(t, err) + m.Reset() + toRead = make([]byte, 300) + _, err = m.Read(toRead) + assert.NoError(t, err) + }) +} diff --git a/filebeat/inputsource/tcp/scan.go b/filebeat/inputsource/tcp/scan.go new file mode 100644 index 00000000000..4e9481581d2 --- /dev/null +++ b/filebeat/inputsource/tcp/scan.go @@ -0,0 +1,31 @@ +package tcp + +import ( + "bufio" + "bytes" +) + +// factoryDelimiter return a function to split line using a custom delimiter supporting multibytes +// delimiter, the delimiter is stripped from the returned value. +func factoryDelimiter(delimiter []byte) bufio.SplitFunc { + return func(data []byte, eof bool) (int, []byte, error) { + if eof && len(data) == 0 { + return 0, nil, nil + } + if i := bytes.Index(data, delimiter); i >= 0 { + return i + len(delimiter), dropDelimiter(data[0:i], delimiter), nil + } + if eof { + return len(data), dropDelimiter(data, delimiter), nil + } + return 0, nil, nil + } +} + +func dropDelimiter(data []byte, delimiter []byte) []byte { + if len(data) > len(delimiter) && + bytes.Equal(data[len(data)-len(delimiter):len(data)], delimiter) { + return data[0 : len(data)-len(delimiter)] + } + return data +} diff --git a/filebeat/inputsource/tcp/scan_test.go b/filebeat/inputsource/tcp/scan_test.go new file mode 100644 index 00000000000..87b28431965 --- /dev/null +++ b/filebeat/inputsource/tcp/scan_test.go @@ -0,0 +1,91 @@ +package tcp + +import ( + "bufio" + "strings" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestCustomDelimiter(t *testing.T) { + tests := []struct { + name string + text string + expected []string + delimiter []byte + }{ + { + name: "Multiple chars delimiter", + text: "hellobonjourholahey", + expected: []string{ + "hello", + "bonjour", + "hola", + "hey", + }, + delimiter: []byte(""), + }, + { + name: "Multiple chars delimiter with half starting delimiter", + text: "hellobonjourhey", + expected: []string{ + "hello", + "bonjour"), + }, + { + name: "Multiple chars delimiter with half ending delimiter", + text: "helloEND>holahey", + expected: []string{ + "hello", + "END>hola", + "hey", + }, + delimiter: []byte(""), + }, + { + name: "Delimiter end of string", + text: "hellobonjourholahey", + expected: []string{ + "hello", + "bonjour", + "hola", + "hey", + }, + delimiter: []byte(""), + }, + { + name: "Single char delimiter", + text: "hello;bonjour;hola;hey", + expected: []string{ + "hello", + "bonjour", + "hola", + "hey", + }, + delimiter: []byte(";"), + }, + { + name: "Empty string", + text: "", + expected: []string(nil), + delimiter: []byte(";"), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + buf := strings.NewReader(test.text) + scanner := bufio.NewScanner(buf) + scanner.Split(factoryDelimiter(test.delimiter)) + var elements []string + for scanner.Scan() { + elements = append(elements, scanner.Text()) + } + assert.EqualValues(t, test.expected, elements) + }) + } +} diff --git a/filebeat/inputsource/tcp/server.go b/filebeat/inputsource/tcp/server.go new file mode 100644 index 00000000000..108f04baa3a --- /dev/null +++ b/filebeat/inputsource/tcp/server.go @@ -0,0 +1,167 @@ +package tcp + +import ( + "bufio" + "bytes" + "fmt" + "net" + "sync" + + "github.com/elastic/beats/libbeat/logp" +) + +// Metadata information about the remote host. +type Metadata struct { + RemoteAddr net.Addr +} + +// CallbackFunc receives new events read from the TCP socket. +type CallbackFunc = func(data []byte, metadata Metadata) + +// Server represent a TCP server +type Server struct { + sync.RWMutex + callback CallbackFunc + config *Config + Listener net.Listener + clients map[*client]struct{} + wg sync.WaitGroup + done chan struct{} + splitFunc bufio.SplitFunc + log *logp.Logger +} + +// New creates a new tcp server +func New( + callback CallbackFunc, + config *Config, +) (*Server, error) { + + if len(config.LineDelimiter) == 0 { + return nil, fmt.Errorf("empty line delimiter") + } + + sf := splitFunc([]byte(config.LineDelimiter)) + return &Server{ + config: config, + callback: callback, + clients: make(map[*client]struct{}, 0), + done: make(chan struct{}), + splitFunc: sf, + log: logp.NewLogger("tcp").With("address", config.Host), + }, nil +} + +// Start listen to the TCP socket. +func (s *Server) Start() error { + var err error + s.Listener, err = net.Listen("tcp", s.config.Host) + if err != nil { + return err + } + + s.log.Info("Started listening for TCP connection") + + s.wg.Add(1) + go func() { + defer s.wg.Done() + s.run() + }() + return nil +} + +// Run start and run a new TCP listener to receive new data +func (s *Server) run() { + for { + conn, err := s.Listener.Accept() + if err != nil { + select { + case <-s.done: + return + default: + s.log.Debugw("Can not accept the connection", "error", err) + continue + } + } + + client := newClient( + conn, + s.log, + s.callback, + s.splitFunc, + s.config.MaxMessageSize, + s.config.Timeout, + ) + + s.log.Debugw("New client", "address", conn.RemoteAddr(), "total", s.clientsCount()) + s.wg.Add(1) + go func() { + defer logp.Recover("recovering from a tcp client crash") + defer s.wg.Done() + defer conn.Close() + + s.registerClient(client) + defer s.unregisterClient(client) + + err := client.handle() + if err != nil { + s.log.Debugw("Client error", "error", err) + } + + s.log.Debugw("Client disconnected", "address", conn.RemoteAddr(), "total", s.clientsCount()) + }() + } +} + +// Stop stops accepting new incoming TCP connection and close any active clients +func (s *Server) Stop() { + s.log.Info("Stopping TCP server") + close(s.done) + s.Listener.Close() + for _, client := range s.allClients() { + client.close() + } + s.wg.Wait() + s.log.Info("TCP server stopped") +} + +func (s *Server) registerClient(client *client) { + s.Lock() + defer s.Unlock() + s.clients[client] = struct{}{} +} + +func (s *Server) unregisterClient(client *client) { + s.Lock() + defer s.Unlock() + delete(s.clients, client) +} + +func (s *Server) allClients() []*client { + s.RLock() + defer s.RUnlock() + currentClients := make([]*client, len(s.clients)) + idx := 0 + for client := range s.clients { + currentClients[idx] = client + idx++ + } + return currentClients +} + +func (s *Server) clientsCount() int { + s.RLock() + defer s.RUnlock() + return len(s.clients) +} + +func splitFunc(lineDelimiter []byte) bufio.SplitFunc { + ld := []byte(lineDelimiter) + if bytes.Equal(ld, []byte("\n")) { + // This will work for most usecases and will also strip \r if present. + // CustomDelimiter, need to match completely and the delimiter will be completely removed from + // the returned byte slice + return bufio.ScanLines + } + return factoryDelimiter(ld) +} diff --git a/filebeat/inputsource/tcp/server_test.go b/filebeat/inputsource/tcp/server_test.go new file mode 100644 index 00000000000..7aaf200ece8 --- /dev/null +++ b/filebeat/inputsource/tcp/server_test.go @@ -0,0 +1,244 @@ +package tcp + +import ( + "fmt" + "math/rand" + "net" + "strings" + "testing" + "time" + + "github.com/dustin/go-humanize" + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/common" +) + +var defaultConfig = Config{ + LineDelimiter: "\n", + Timeout: time.Minute * 5, + MaxMessageSize: 20 * humanize.MiByte, +} + +type info struct { + message string + mt Metadata +} + +func TestErrorOnEmptyLineDelimiter(t *testing.T) { + cfg := map[string]interface{}{ + "line_delimiter": "", + } + + c, _ := common.NewConfigFrom(cfg) + config := defaultConfig + err := c.Unpack(&config) + assert.Error(t, err) +} + +func TestReceiveEventsAndMetadata(t *testing.T) { + expectedMessages := generateMessages(5, 100) + largeMessages := generateMessages(10, 4096) + + tests := []struct { + name string + cfg map[string]interface{} + expectedMessages []string + messageSent string + }{ + { + name: "NewLine", + cfg: map[string]interface{}{}, + expectedMessages: expectedMessages, + messageSent: strings.Join(expectedMessages, "\n"), + }, + { + name: "NewLineWithCR", + cfg: map[string]interface{}{}, + expectedMessages: expectedMessages, + messageSent: strings.Join(expectedMessages, "\r\n"), + }, + { + name: "CustomDelimiter", + cfg: map[string]interface{}{ + "line_delimiter": ";", + }, + expectedMessages: expectedMessages, + messageSent: strings.Join(expectedMessages, ";"), + }, + { + name: "MultipleCharsCustomDelimiter", + cfg: map[string]interface{}{ + "line_delimiter": "", + }, + expectedMessages: expectedMessages, + messageSent: strings.Join(expectedMessages, ""), + }, + { + name: "SingleCharCustomDelimiterMessageWithoutBoudaries", + cfg: map[string]interface{}{ + "line_delimiter": ";", + }, + expectedMessages: []string{"hello"}, + messageSent: "hello", + }, + { + name: "MultipleCharCustomDelimiterMessageWithoutBoundaries", + cfg: map[string]interface{}{ + "line_delimiter": "", + }, + expectedMessages: []string{"hello"}, + messageSent: "hello", + }, + { + name: "NewLineMessageWithoutBoundaries", + cfg: map[string]interface{}{ + "line_delimiter": "\n", + }, + expectedMessages: []string{"hello"}, + messageSent: "hello", + }, + { + name: "NewLineLargeMessagePayload", + cfg: map[string]interface{}{ + "line_delimiter": "\n", + }, + expectedMessages: largeMessages, + messageSent: strings.Join(largeMessages, "\n"), + }, + { + name: "CustomLargeMessagePayload", + cfg: map[string]interface{}{ + "line_delimiter": ";", + }, + expectedMessages: largeMessages, + messageSent: strings.Join(largeMessages, ";"), + }, + { + name: "MaxReadBufferReached", + cfg: map[string]interface{}{}, + expectedMessages: []string{}, + messageSent: randomString(900000), + }, + { + name: "MaxReadBufferReachedUserConfigured", + cfg: map[string]interface{}{ + "max_read_message": 50000, + }, + expectedMessages: []string{}, + messageSent: randomString(600000), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ch := make(chan *info, len(test.expectedMessages)) + defer close(ch) + to := func(message []byte, mt Metadata) { + ch <- &info{message: string(message), mt: mt} + } + test.cfg["host"] = "localhost:0" + cfg, _ := common.NewConfigFrom(test.cfg) + config := defaultConfig + err := cfg.Unpack(&config) + if !assert.NoError(t, err) { + return + } + server, err := New(to, &config) + if !assert.NoError(t, err) { + return + } + err = server.Start() + if !assert.NoError(t, err) { + return + } + defer server.Stop() + + conn, err := net.Dial("tcp", server.Listener.Addr().String()) + assert.NoError(t, err) + fmt.Fprint(conn, test.messageSent) + conn.Close() + + var events []*info + + for len(events) < len(test.expectedMessages) { + select { + case event := <-ch: + events = append(events, event) + default: + } + } + + for idx, e := range events { + assert.Equal(t, test.expectedMessages[idx], e.message) + assert.NotNil(t, e.mt.RemoteAddr) + } + }) + } +} + +func TestReceiveNewEventsConcurrently(t *testing.T) { + workers := 4 + eventsCount := 100 + ch := make(chan *info, eventsCount*workers) + defer close(ch) + to := func(message []byte, mt Metadata) { + ch <- &info{message: string(message), mt: mt} + } + cfg, err := common.NewConfigFrom(map[string]interface{}{"host": ":0"}) + if !assert.NoError(t, err) { + return + } + config := defaultConfig + err = cfg.Unpack(&config) + if !assert.NoError(t, err) { + return + } + server, err := New(to, &config) + if !assert.NoError(t, err) { + return + } + err = server.Start() + if !assert.NoError(t, err) { + return + } + defer server.Stop() + + samples := generateMessages(eventsCount, 1024) + for w := 0; w < workers; w++ { + go func() { + conn, err := net.Dial("tcp", server.Listener.Addr().String()) + defer conn.Close() + assert.NoError(t, err) + for _, sample := range samples { + fmt.Fprintln(conn, sample) + } + }() + } + + var events []*info + for len(events) < eventsCount*workers { + select { + case event := <-ch: + events = append(events, event) + default: + } + } +} + +func randomString(l int) string { + charsets := []byte("abcdefghijklmnopqrstuvwzyzABCDEFGHIJKLMNOPQRSTUVWZYZ0123456789") + message := make([]byte, l) + for i := range message { + message[i] = charsets[rand.Intn(len(charsets))] + } + return string(message) +} + +func generateMessages(c int, l int) []string { + messages := make([]string, c) + for i := range messages { + messages[i] = randomString(l) + } + return messages +} diff --git a/filebeat/tests/system/test_tcp.py b/filebeat/tests/system/test_tcp.py new file mode 100644 index 00000000000..d6788d164ea --- /dev/null +++ b/filebeat/tests/system/test_tcp.py @@ -0,0 +1,68 @@ +from filebeat import BaseTest +import socket + + +class Test(BaseTest): + """ + Test filebeat TCP input + """ + + def test_tcp_with_newline_delimiter(self): + """ + Test TCP input with a new line delimiter + """ + self.send_events_with_delimiter("\n") + + def test_tcp_with_custom_char_delimiter(self): + """ + Test TCP input with a custom single char delimiter + """ + self.send_events_with_delimiter(";") + + def test_tcp_with_custom_word_delimiter(self): + """ + Test TCP input with a custom single char delimiter + """ + self.send_events_with_delimiter("") + + def send_events_with_delimiter(self, delimiter): + host = "127.0.0.1" + port = 8080 + input_raw = """ +- type: tcp + host: "{}:{}" + enabled: true +""" + + # Use default of \n and stripping \r + if delimiter is not "": + input_raw += "\n line_delimiter: {}".format(delimiter) + + input_raw = input_raw.format(host, port) + + self.render_config_template( + input_raw=input_raw, + inputs=False, + ) + + filebeat = self.start_beat() + + self.wait_until(lambda: self.log_contains("Started listening for TCP connection")) + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # TCP + sock.connect((host, port)) + + for n in range(0, 2): + sock.send("Hello World: " + str(n) + delimiter) + + self.wait_until(lambda: self.output_count(lambda x: x >= 2)) + + filebeat.check_kill_and_wait() + + output = self.read_output() + + assert len(output) == 2 + assert output[0]["prospector.type"] == "tcp" + assert output[0]["input.type"] == "tcp" + + sock.close() From 194d48acf48bfac31b7c330803864feea9d7a11f Mon Sep 17 00:00:00 2001 From: ph Date: Wed, 4 Apr 2018 12:06:39 -0400 Subject: [PATCH 2/3] uses a mutex instead of atomic bool --- filebeat/input/tcp/input.go | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/filebeat/input/tcp/input.go b/filebeat/input/tcp/input.go index 2153d4154a0..29dde22ff34 100644 --- a/filebeat/input/tcp/input.go +++ b/filebeat/input/tcp/input.go @@ -1,6 +1,7 @@ package tcp import ( + "sync" "time" "github.com/elastic/beats/filebeat/channel" @@ -10,7 +11,6 @@ import ( "github.com/elastic/beats/filebeat/util" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/common/atomic" "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/libbeat/logp" ) @@ -24,8 +24,9 @@ func init() { // Input for TCP connection type Input struct { + sync.Mutex server *tcp.Server - started atomic.Bool + started bool outlet channel.Outleter config *config log *logp.Logger @@ -64,7 +65,7 @@ func NewInput( return &Input{ server: server, - started: atomic.MakeBool(false), + started: false, outlet: out, config: &config, log: logp.NewLogger("tcp input").With(config.Config.Host), @@ -73,13 +74,16 @@ func NewInput( // Run start a TCP input func (p *Input) Run() { - if !p.started.Load() { + p.Lock() + defer p.Unlock() + + if !p.started { p.log.Info("Starting TCP input") err := p.server.Start() if err != nil { p.log.Errorw("Error starting the TCP server", "error", err) } - p.started.Swap(true) + p.started = true } } @@ -87,8 +91,11 @@ func (p *Input) Run() { func (p *Input) Stop() { p.log.Info("Stopping TCP input") defer p.outlet.Close() - defer p.started.Swap(false) + p.Lock() + defer p.Unlock() + p.server.Stop() + p.started = false } // Wait stop the current server From 99e469eb8e607845f45d9a751ecdea85f505b12d Mon Sep 17 00:00:00 2001 From: ph Date: Wed, 4 Apr 2018 13:13:32 -0400 Subject: [PATCH 3/3] change log order. --- filebeat/input/tcp/input.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/filebeat/input/tcp/input.go b/filebeat/input/tcp/input.go index 29dde22ff34..061b4318dee 100644 --- a/filebeat/input/tcp/input.go +++ b/filebeat/input/tcp/input.go @@ -89,11 +89,11 @@ func (p *Input) Run() { // Stop stops TCP server func (p *Input) Stop() { - p.log.Info("Stopping TCP input") defer p.outlet.Close() p.Lock() defer p.Unlock() + p.log.Info("Stopping TCP input") p.server.Stop() p.started = false }