-
Notifications
You must be signed in to change notification settings - Fork 4.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Allow to receive new events via TCP, this will create a new event per line and add some useful information about the connected client to the evant. This input is marked as **experimental**. This input expose the following settings: - `line_delimiter`: The characters used to split incoming events, by default it will split on `\n` and will also strip both `\n` and `\r`. You can also use any characters like `;` and you can also used multiple characters delimiter like `<END>`, the delimiter tokens will always be removed from the string. - `max_message_size`: This is a number of bytes that a client can buffer in memory before finding a new message, if the limit is reached the connection is killed and the event is logged. This is to prevent rogue clients to DoS attack by consuming all the available memory. The default values is 20971520. - `timeout`: The server will close any client that reach this inactivity timeout. Ref: #5862
- Loading branch information
1 parent
e25faba
commit 32ba1c2
Showing
18 changed files
with
1,097 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
:type: tcp | ||
|
||
[id="{beatname_lc}-input-{type}"] | ||
=== TCP input | ||
|
||
++++ | ||
<titleabbrev>TCP</titleabbrev> | ||
++++ | ||
|
||
Use the `TCP` input to read events over TCP. | ||
|
||
Example configuration: | ||
|
||
["source","yaml",subs="attributes"] | ||
---- | ||
{beatname_lc}.inputs: | ||
- type: tcp | ||
max_message_size: 10240 | ||
host: "localhost:9000" | ||
---- | ||
|
||
|
||
==== Configuration options | ||
|
||
The `tcp` input supports the following configuration options plus the | ||
<<{beatname_lc}-input-{type}-common-options>> described later. | ||
|
||
[float] | ||
[id="{beatname_lc}-input-{type}-max-message-size"] | ||
==== `max_message_size` | ||
|
||
The maximum size of the message received over TCP. The default is `20MiB`. | ||
|
||
[float] | ||
[id="{beatname_lc}-input-{type}-host"] | ||
==== `host` | ||
|
||
The host and TCP port to listen on for event streams. | ||
|
||
[float] | ||
[id="{beatname_lc}-input-{type}-line-delimiter"] | ||
==== `line_delimiter` | ||
|
||
Specify the characters used to split the incoming events. The default is '\n'. | ||
|
||
[float] | ||
[id="{beatname_lc}-input-{type}-timeout"] | ||
==== `timeout` | ||
|
||
The number of seconds of inactivity before a remote connection is closed. The default is `300s`. | ||
|
||
[id="{beatname_lc}-input-{type}-common-options"] | ||
include::../inputs/input-common-options.asciidoc[] | ||
|
||
:type!: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
package tcp | ||
|
||
import ( | ||
"time" | ||
|
||
"github.com/dustin/go-humanize" | ||
|
||
"github.com/elastic/beats/filebeat/harvester" | ||
"github.com/elastic/beats/filebeat/inputsource/tcp" | ||
) | ||
|
||
type config struct { | ||
tcp.Config `config:",inline"` | ||
harvester.ForwarderConfig `config:",inline"` | ||
} | ||
|
||
var defaultConfig = config{ | ||
ForwarderConfig: harvester.ForwarderConfig{ | ||
Type: "tcp", | ||
}, | ||
Config: tcp.Config{ | ||
LineDelimiter: "\n", | ||
Timeout: time.Minute * 5, | ||
MaxMessageSize: 20 * humanize.MiByte, | ||
}, | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
package tcp | ||
|
||
import ( | ||
"sync" | ||
"time" | ||
|
||
"github.com/elastic/beats/filebeat/channel" | ||
"github.com/elastic/beats/filebeat/harvester" | ||
"github.com/elastic/beats/filebeat/input" | ||
"github.com/elastic/beats/filebeat/inputsource/tcp" | ||
"github.com/elastic/beats/filebeat/util" | ||
"github.com/elastic/beats/libbeat/beat" | ||
"github.com/elastic/beats/libbeat/common" | ||
"github.com/elastic/beats/libbeat/common/cfgwarn" | ||
"github.com/elastic/beats/libbeat/logp" | ||
) | ||
|
||
func init() { | ||
err := input.Register("tcp", NewInput) | ||
if err != nil { | ||
panic(err) | ||
} | ||
} | ||
|
||
// Input for TCP connection | ||
type Input struct { | ||
sync.Mutex | ||
server *tcp.Server | ||
started bool | ||
outlet channel.Outleter | ||
config *config | ||
log *logp.Logger | ||
} | ||
|
||
// NewInput creates a new TCP input | ||
func NewInput( | ||
cfg *common.Config, | ||
outlet channel.Factory, | ||
context input.Context, | ||
) (input.Input, error) { | ||
cfgwarn.Experimental("TCP input type is used") | ||
|
||
out, err := outlet(cfg, context.DynamicFields) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
forwarder := harvester.NewForwarder(out) | ||
|
||
config := defaultConfig | ||
err = cfg.Unpack(&config) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
cb := func(data []byte, metadata tcp.Metadata) { | ||
event := createEvent(data, metadata) | ||
forwarder.Send(event) | ||
} | ||
|
||
server, err := tcp.New(cb, &config.Config) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return &Input{ | ||
server: server, | ||
started: false, | ||
outlet: out, | ||
config: &config, | ||
log: logp.NewLogger("tcp input").With(config.Config.Host), | ||
}, nil | ||
} | ||
|
||
// Run start a TCP input | ||
func (p *Input) Run() { | ||
p.Lock() | ||
defer p.Unlock() | ||
|
||
if !p.started { | ||
p.log.Info("Starting TCP input") | ||
err := p.server.Start() | ||
if err != nil { | ||
p.log.Errorw("Error starting the TCP server", "error", err) | ||
} | ||
p.started = true | ||
} | ||
} | ||
|
||
// Stop stops TCP server | ||
func (p *Input) Stop() { | ||
defer p.outlet.Close() | ||
p.Lock() | ||
defer p.Unlock() | ||
|
||
p.log.Info("Stopping TCP input") | ||
p.server.Stop() | ||
p.started = false | ||
} | ||
|
||
// Wait stop the current server | ||
func (p *Input) Wait() { | ||
p.Stop() | ||
} | ||
|
||
func createEvent(raw []byte, metadata tcp.Metadata) *util.Data { | ||
data := util.NewData() | ||
data.Event = beat.Event{ | ||
Timestamp: time.Now(), | ||
Fields: common.MapStr{ | ||
"message": string(raw), | ||
"source": metadata.RemoteAddr.String(), | ||
}, | ||
} | ||
return data | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
package tcp | ||
|
||
import ( | ||
"net" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
|
||
"github.com/elastic/beats/filebeat/inputsource/tcp" | ||
) | ||
|
||
func TestCreateEvent(t *testing.T) { | ||
hello := "hello world" | ||
ip := "127.0.0.1" | ||
parsedIP := net.ParseIP(ip) | ||
addr := &net.IPAddr{IP: parsedIP, Zone: ""} | ||
|
||
message := []byte(hello) | ||
mt := tcp.Metadata{RemoteAddr: addr} | ||
|
||
data := createEvent(message, mt) | ||
event := data.GetEvent() | ||
|
||
m, err := event.GetValue("message") | ||
assert.NoError(t, err) | ||
assert.Equal(t, string(message), m) | ||
|
||
from, _ := event.GetValue("source") | ||
assert.Equal(t, ip, from) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
package tcp | ||
|
||
import ( | ||
"bufio" | ||
"net" | ||
"time" | ||
|
||
"github.com/pkg/errors" | ||
|
||
"github.com/elastic/beats/libbeat/logp" | ||
) | ||
|
||
// Client is a remote client. | ||
type client struct { | ||
conn net.Conn | ||
log *logp.Logger | ||
callback CallbackFunc | ||
done chan struct{} | ||
metadata Metadata | ||
splitFunc bufio.SplitFunc | ||
maxReadMessage size | ||
timeout time.Duration | ||
} | ||
|
||
func newClient( | ||
conn net.Conn, | ||
log *logp.Logger, | ||
callback CallbackFunc, | ||
splitFunc bufio.SplitFunc, | ||
maxReadMessage size, | ||
timeout time.Duration, | ||
) *client { | ||
client := &client{ | ||
conn: conn, | ||
log: log.With("address", conn.RemoteAddr()), | ||
callback: callback, | ||
done: make(chan struct{}), | ||
splitFunc: splitFunc, | ||
maxReadMessage: maxReadMessage, | ||
timeout: timeout, | ||
metadata: Metadata{ | ||
RemoteAddr: conn.RemoteAddr(), | ||
}, | ||
} | ||
return client | ||
} | ||
|
||
func (c *client) handle() error { | ||
r := NewResetableLimitedReader(NewDeadlineReader(c.conn, c.timeout), uint64(c.maxReadMessage)) | ||
buf := bufio.NewReader(r) | ||
scanner := bufio.NewScanner(buf) | ||
scanner.Split(c.splitFunc) | ||
|
||
for scanner.Scan() { | ||
err := scanner.Err() | ||
if err != nil { | ||
// we are forcing a close on the socket, lets ignore any error that could happen. | ||
select { | ||
case <-c.done: | ||
break | ||
default: | ||
} | ||
// This is a user defined limit and we should notify the user. | ||
if IsMaxReadBufferErr(err) { | ||
c.log.Errorw("client errors", "error", err) | ||
} | ||
return errors.Wrap(err, "tcp client error") | ||
} | ||
r.Reset() | ||
c.callback(scanner.Bytes(), c.metadata) | ||
} | ||
return nil | ||
} | ||
|
||
func (c *client) close() { | ||
close(c.done) | ||
c.conn.Close() | ||
} |
Oops, something went wrong.