-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature: TCP Input #6700
Feature: TCP Input #6700
Conversation
5e52eee
to
2d79e6e
Compare
2d79e6e
to
11a7fdb
Compare
I've squashed everything into a single commit, ready to get that reviewed, there is one test failling related to metricbeat and apache 2.4.12.
I've created #6722 to tackle this. |
jenkins test this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did not have time yet to go through the full PR in detail but thought I already send the first comments.
CHANGELOG.asciidoc
Outdated
@@ -51,6 +51,8 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di | |||
- Remove the undefined `username` option from the Redis input and clarify the documentation. {pull}6662[6662] | |||
- Add validation for Stdin, when Filebeat is configured with Stdin and any other inputs, Filebeat | |||
will now refuses to start. {pull}6463[6463] | |||
- Refactor the usage of prospector to input in the YAML reference {pull}6121[6121] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line is not related to this PR
CHANGELOG.asciidoc
Outdated
@@ -51,6 +51,8 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di | |||
- Remove the undefined `username` option from the Redis input and clarify the documentation. {pull}6662[6662] | |||
- Add validation for Stdin, when Filebeat is configured with Stdin and any other inputs, Filebeat | |||
will now refuses to start. {pull}6463[6463] | |||
- Refactor the usage of prospector to input in the YAML reference {pull}6121[6121] | |||
- Addition of the TCP input {pull}6266[6266] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR number should be adjusted.
#enabled: false | ||
|
||
# The host and port to receive the new event | ||
#host: "localhost:9000" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember @andrewkroh suggested to not have a default and require the use to specify it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We currently don't have any default on this value, I've provided an example hosts/port, but It's not a hard default.
var defaultConfig = config{
ForwarderConfig: harvester.ForwarderConfig{
Type: "tcp",
},
Config: tcp.Config{
LineDelimiter: "\n",
Timeout: time.Minute * 5,
MaxMessageSize: 20 * 1024 * 1024,
},
}
Following your comment, I did a small change and added a more explicit error for the host.
[id="{beatname_lc}-input-{type}-max-message-size"] | ||
==== `max_message_size` | ||
|
||
The maximum size of the message received over TCP. The default is `20971520`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps mention that this is 20MB
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
switched to go-humanize
instead and allow 20MiB
|
||
[float] | ||
[id="{beatname_lc}-input-{type}-timeout"] | ||
==== `timeout` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't see this config in the reference config file. Also make sure to use 300s
(with unit) and not just 300. See other timeout config options. Since we had issue in the past without units we now always try to have units.
filebeat/input/tcp/input.go
Outdated
if !p.started.Load() { | ||
p.log.Info("Starting TCP input") | ||
err := p.server.Start() | ||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this return the error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would require changes in the upstream interface, I can create a followup issue on that?
// Input is the interface common to all input
type Input interface {
Run()
Stop()
Wait()
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general the way we use Run()
is if it's blocking and often returns an error. In contrast we use Start()
if it's not blocking. I would expect all three in the above interface to potentially return an error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general the way we use Run() is if it's blocking and often returns an error. In contrast we use Start() if it's not blocking. I would expect all three in the above interface to potentially return an error.
Agree on that point but I think changing that interface is out of scope for this PR, so do you agree the plan of action is to do a followup issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, if we follow up on this ;-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
created #6771
filebeat/input/tcp/input.go
Outdated
data.Event = beat.Event{ | ||
Timestamp: time.Now(), | ||
Meta: common.MapStr{ | ||
"ip_address": metadata.IPAddress, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest we have this information also in each event under something like source.ip
or similar?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for source.ip when following ECS, but currently this will clash with source
which is the original read file. So using source.ip
will need to wait for 7.0, maybe we can use client.ip
?
description: > source
The file from which the line was read. This field contains the absolute path to the file.
For example: `/var/log/system.log`.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The collision is unfortunate. Under ECS source
will be moving to file.path
(for the log input). I think we have three options
- Put the remote IP and port into
source
(e.g.source: 127.0.0.1:38849
) - Leave it under
@metadata
. The value will be there if someone really needs it (they can use the rename processor to get it stored in ES). Then in 7.0 we move this tosource.ip
. Because it's tucked away in@metadata
the impact of renaming it in 7.0 is minimized. - Store it in
client.ip
and rename it tosource.ip
for 7.0. This adds more fields that need to be renamed for 7.0, but on the plus side the value is more visible if people want to use it.
I like number 1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- make sense, I didn't see like that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is effectively the source
file for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
{
"@timestamp": "2018-04-03T20:32:55.908Z",
"@metadata": {
"beat": "filebeat",
"type": "doc",
"version": "7.0.0-alpha1"
},
"source": "127.0.0.1:61656",
"prospector": {
"type": "tcp"
},
"input": {
"type": "tcp"
},
"beat": {
"name": "sashimi",
"hostname": "sashimi",
"version": "7.0.0-alpha1"
},
"message": "heee"
}
} | ||
|
||
// NewResetableLimitedReader returns a new ResetableLimitedReader | ||
func NewResetableLimitedReader(reader io.Reader, maxReadBuffer uint64) *ResetableLimitedReader { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have harvester/reader/
package. I wonder if these have the same interface and should go there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer to keep them together until it's reused by something else.
filebeat/inputsource/tcp/scan.go
Outdated
|
||
// ScanDelimiter return a function to split line using a custom delimiter, the delimiter | ||
// is stripped from the returned value. | ||
func scanDelimiter(delimiter []byte) bufio.SplitFunc { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this similar to the strip_newline reader?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really, I will rename this method to factoryDelimiter()
which is what it does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It returns a custom split func
that support a custom delimiter which can be a multibytes delimiter.
filebeat/inputsource/tcp/client.go
Outdated
timeout time.Duration | ||
} | ||
|
||
// NewClient returns a new client instance for the remote connection. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/NewClient/newClient
so it matches the name.
filebeat/inputsource/tcp/client.go
Outdated
maxReadMessage: maxReadMessage, | ||
timeout: timeout, | ||
metadata: Metadata{ | ||
IPAddress: conn.RemoteAddr().String(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way this is populated it's more than an IP address so I think the name is confusing. I'd go with RemoteAddr
. Or split it here and have two fields RemoteIP
and RemotePort
(which align to the future source.ip
and source.port
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand this correctly, the ip address provided here is in the end the host that sent the log message. So my suggestion would be to use host.ip
for this. So the host.ip
would be the same if the file is read locally or sent over tcp.
For the port
I'm not sure if this is relevant in this context and could be skipped. If we add the port, it would be the port of the service running, so service.port
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you are right port in that context isn't necessary and we can drop it and I would also say the same for port of the service running.
@ruflin For the future I still believe that source.ip
is what we should aim, I think in using source
as the IP until ECS is deployed is a good compromised into not creating new fields that we have to deprecated right away.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreement on zoom with @andrewkroh @ruflin @ph:
- set remote ip + remote port to the
source
field in the event. - field will be keyword until we have ECS rolling.
- on Metadata uses
RemoteAddr
instead ofIPAddress
.
Example format : 127.0.0.1:99998
Allow to receive new events via TCP, this will create a new event per line and add some useful information about the connected client to the evant. This input is marked as **experimental**. This input expose the following settings: - `line_delimiter`: The characters used to split incoming events, by default it will split on `\n` and will also strip both `\n` and `\r`. You can also use any characters like `;` and you can also used multiple characters delimiter like `<END>`, the delimiter tokens will always be removed from the string. - `max_message_size`: This is a number of bytes that a client can buffer in memory before finding a new message, if the limit is reached the connection is killed and the event is logged. This is to prevent rogue clients to DoS attack by consuming all the available memory. The default values is 20971520. - `timeout`: The server will close any client that reach this inactivity timeout. Ref: elastic#5862
filebeat/input/tcp/input.go
Outdated
p.log.Info("Starting TCP input") | ||
err := p.server.Start() | ||
if err != nil { | ||
p.log.Errorw("Error starting the TCP server", "error", err) | ||
} | ||
p.started.Swap(true) | ||
p.started = true | ||
} | ||
} | ||
|
||
// Stop stops TCP server | ||
func (p *Input) Stop() { | ||
p.log.Info("Stopping TCP input") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logging is not congruent with the UDP input. In UDP the message is logged after acquiring the lock. IDK which order, but I would like the behavior to be the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, done in the next commit.
Allow to receive new events via TCP, this will create a new event per
line and add some useful information about the connected client to the
event. This input is marked as experimental.
This input expose the following settings:
line_delimiter
: The characters used to split incoming events, bydefault it will split on
\n
and will also strip both\n
and\r
.You can also use any characters like
;
and you can also used multiplecharacters delimiter like
<END>
, the delimiter tokens will always beremoved from the string.
max_message_size
: This is a number of bytes that a client can bufferin memory before finding a new message, if the limit is reached the
connection is killed and the event is logged. This is to prevent rogue
clients to DoS attack by consuming all the available memory. The default
values is 20971520.
timeout
: The server will close any client that reach this inactivitytimeout.
Host
: Which IP and port to bind.This is a new PR based on #6266
A few things are differents: