Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: TCP Input #6700

Merged
merged 3 commits into from
Apr 5, 2018
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions filebeat/input/tcp/input.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tcp

import (
"sync"
"time"

"github.com/elastic/beats/filebeat/channel"
Expand All @@ -10,7 +11,6 @@ import (
"github.com/elastic/beats/filebeat/util"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/atomic"
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/logp"
)
Expand All @@ -24,8 +24,9 @@ func init() {

// Input for TCP connection
type Input struct {
sync.Mutex
server *tcp.Server
started atomic.Bool
started bool
outlet channel.Outleter
config *config
log *logp.Logger
Expand Down Expand Up @@ -64,7 +65,7 @@ func NewInput(

return &Input{
server: server,
started: atomic.MakeBool(false),
started: false,
outlet: out,
config: &config,
log: logp.NewLogger("tcp input").With(config.Config.Host),
Expand All @@ -73,22 +74,28 @@ func NewInput(

// Run start a TCP input
func (p *Input) Run() {
if !p.started.Load() {
p.Lock()
defer p.Unlock()

if !p.started {
p.log.Info("Starting TCP input")
err := p.server.Start()
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this return the error?

Copy link
Contributor Author

@ph ph Apr 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would require changes in the upstream interface, I can create a followup issue on that?

// Input is the interface common to all input
type Input interface {
	Run()
	Stop()
	Wait()
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general the way we use Run() is if it's blocking and often returns an error. In contrast we use Start() if it's not blocking. I would expect all three in the above interface to potentially return an error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general the way we use Run() is if it's blocking and often returns an error. In contrast we use Start() if it's not blocking. I would expect all three in the above interface to potentially return an error.

Agree on that point but I think changing that interface is out of scope for this PR, so do you agree the plan of action is to do a followup issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if we follow up on this ;-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

created #6771

p.log.Errorw("Error starting the TCP server", "error", err)
}
p.started.Swap(true)
p.started = true
}
}

// Stop stops TCP server
func (p *Input) Stop() {
p.log.Info("Stopping TCP input")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logging is not congruent with the UDP input. In UDP the message is logged after acquiring the lock. IDK which order, but I would like the behavior to be the same.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, done in the next commit.

defer p.outlet.Close()
defer p.started.Swap(false)
p.Lock()
defer p.Unlock()

p.server.Stop()
p.started = false
}

// Wait stop the current server
Expand Down