Skip to content

Commit

Permalink
Move TCP UDP start up into server.Start() (#4903)
Browse files Browse the repository at this point in the history
  • Loading branch information
vjsamuel authored and andrewkroh committed Aug 18, 2017
1 parent 4302f53 commit d20e58a
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 36 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta1...master[Check the HEAD di

- Update init scripts to use the `test config` subcommand instead of the deprecated `-configtest` flag. {issue}4600[4600]
- Get by default the credentials for connecting to Kibana from the Elasticsearch output configuration. {pull}4867[4867]
- Move TCP UDP start up into `server.Start()` {pull}4903[4903]

*Auditbeat*

Expand Down
3 changes: 2 additions & 1 deletion metricbeat/helper/server/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewHttpServer(mb mb.BaseMetricSet) (server.Server, error) {
return h, nil
}

func (h *HttpServer) Start() {
func (h *HttpServer) Start() error {
go func() {

logp.Info("Starting http server on %s", h.server.Addr)
Expand All @@ -67,6 +67,7 @@ func (h *HttpServer) Start() {
}
}()

return nil
}

func (h *HttpServer) Stop() {
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/helper/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ const (
// Server is an interface that can be used to implement servers which can accept data.
type Server interface {
// Start is used to start the server at a well defined port.
Start()
Start() error
// Stop the server.
Stop()
// Get a channel of events.
Expand Down
25 changes: 15 additions & 10 deletions metricbeat/helper/server/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ import (
"fmt"
"net"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/helper/server"
"github.com/elastic/beats/metricbeat/mb"
)

type TcpServer struct {
tcpAddr *net.TCPAddr
listener *net.TCPListener
receiveBufferSize int
done chan struct{}
Expand Down Expand Up @@ -42,25 +45,27 @@ func NewTcpServer(base mb.BaseMetricSet) (server.Server, error) {
return nil, err
}

listener, err := net.ListenTCP("tcp", addr)
if err != nil {
return nil, err
}

logp.Info("Started listening for TCP on: %s:%d", config.Host, config.Port)
return &TcpServer{
listener: listener,
tcpAddr: addr,
receiveBufferSize: config.ReceiveBufferSize,
done: make(chan struct{}),
eventQueue: make(chan server.Event),
}, nil
}

func (g *TcpServer) Start() {
go g.WatchMetrics()
func (g *TcpServer) Start() error {
listener, err := net.ListenTCP("tcp", g.tcpAddr)
if err != nil {
return errors.Wrap(err, "failed to start TCP server")
}
g.listener = listener
logp.Info("Started listening for TCP on: %s", g.tcpAddr.String())

go g.watchMetrics()
return nil
}

func (g *TcpServer) WatchMetrics() {
func (g *TcpServer) watchMetrics() {
buffer := make([]byte, g.receiveBufferSize)
for {
select {
Expand Down
14 changes: 7 additions & 7 deletions metricbeat/helper/server/tcp/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,9 @@ func GetTestTcpServer(host string, port int) (server.Server, error) {
return nil, err
}

listener, err := net.ListenTCP("tcp", addr)
if err != nil {
return nil, err
}

logp.Info("Started listening for TCP on: %s:%d", host, port)
return &TcpServer{
listener: listener,
tcpAddr: addr,
receiveBufferSize: 1024,
done: make(chan struct{}),
eventQueue: make(chan server.Event),
Expand All @@ -43,7 +38,12 @@ func TestTcpServer(t *testing.T) {
t.FailNow()
}

svc.Start()
err = svc.Start()
if err != nil {
t.Error(err)
t.FailNow()
}

defer svc.Stop()
writeToServer(t, "test1", host, port)
msg := <-svc.GetEvents()
Expand Down
26 changes: 16 additions & 10 deletions metricbeat/helper/server/udp/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ import (
"fmt"
"net"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/helper/server"
"github.com/elastic/beats/metricbeat/mb"
)

type UdpServer struct {
udpaddr *net.UDPAddr
listener *net.UDPConn
receiveBufferSize int
done chan struct{}
Expand Down Expand Up @@ -43,25 +46,28 @@ func NewUdpServer(base mb.BaseMetricSet) (server.Server, error) {
return nil, err
}

listener, err := net.ListenUDP("udp", addr)
if err != nil {
return nil, err
}

logp.Info("Started listening for UDP on: %s:%d", config.Host, config.Port)
return &UdpServer{
listener: listener,
udpaddr: addr,
receiveBufferSize: config.ReceiveBufferSize,
done: make(chan struct{}),
eventQueue: make(chan server.Event),
}, nil
}

func (g *UdpServer) Start() {
go g.WatchMetrics()
func (g *UdpServer) Start() error {
listener, err := net.ListenUDP("udp", g.udpaddr)
if err != nil {
return errors.Wrap(err, "failed to start UDP server")
}

logp.Info("Started listening for UDP on: %s", g.udpaddr.String())
g.listener = listener

go g.watchMetrics()
return nil
}

func (g *UdpServer) WatchMetrics() {
func (g *UdpServer) watchMetrics() {
buffer := make([]byte, g.receiveBufferSize)
for {
select {
Expand Down
12 changes: 6 additions & 6 deletions metricbeat/helper/server/udp/udp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,9 @@ func GetTestUdpServer(host string, port int) (server.Server, error) {
return nil, err
}

listener, err := net.ListenUDP("udp", addr)
if err != nil {
return nil, err
}

logp.Info("Started listening for UDP on: %s:%d", host, port)
return &UdpServer{
listener: listener,
udpaddr: addr,
receiveBufferSize: 1024,
done: make(chan struct{}),
eventQueue: make(chan server.Event),
Expand All @@ -44,6 +39,11 @@ func TestUdpServer(t *testing.T) {
}

svc.Start()
if err != nil {
t.Error(err)
t.FailNow()
}

defer svc.Stop()
writeToServer(t, "test1", host, port)
msg := <-svc.GetEvents()
Expand Down
10 changes: 9 additions & 1 deletion metricbeat/module/graphite/server/server.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package server

import (
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/logp"
serverhelper "github.com/elastic/beats/metricbeat/helper/server"
"github.com/elastic/beats/metricbeat/helper/server/tcp"
"github.com/elastic/beats/metricbeat/helper/server/udp"
Expand Down Expand Up @@ -61,7 +64,12 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Run method provides the Graphite server with a reporter with which events can be reported.
func (m *MetricSet) Run(reporter mb.PushReporter) {
// Start event watcher
m.server.Start()
if err := m.server.Start(); err != nil {
err = errors.Wrap(err, "failed to start graphite server")
logp.Err("%v", err)
reporter.Error(err)
return
}

for {
select {
Expand Down

0 comments on commit d20e58a

Please sign in to comment.