Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UDP prospector type with plain harvester #4452

Merged
merged 2 commits into from
Jun 23, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha1...master[Check the HEAD d
- Nginx module: use the first not-private IP address as the remote_ip. {pull}4417[4417]
- Load Ingest Node pipelines when the Elasticsearch connection is established, instead of only once at startup. {pull}4479[4479]
- Add support for loading Xpack Machine Learning configurations from the modules, and added sample configurations for the Nginx module. {pull}4506[4506]
- Add udp prospector type. {pull}4452[4452]

*Heartbeat*

Expand Down
4 changes: 4 additions & 0 deletions filebeat/_meta/common.full.p2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,10 @@ filebeat.prospectors:
# Redis AUTH password. Empty by default.
#password: foobared

#- type: udp

# Maximum size of the message received over UDP
#max_message_size: 10240
#========================= Filebeat global options ============================

# Event count spool threshold - forces network flush if exceeded
Expand Down
4 changes: 4 additions & 0 deletions filebeat/filebeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,10 @@ filebeat.prospectors:
# Redis AUTH password. Empty by default.
#password: foobared

#- type: udp

# Maximum size of the message received over UDP
#max_message_size: 10240
#========================= Filebeat global options ============================

# Event count spool threshold - forces network flush if exceeded
Expand Down
2 changes: 2 additions & 0 deletions filebeat/harvester/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ const (
LogType = "log"
StdinType = "stdin"
RedisType = "redis"
UdpType = "udp"
)

// ValidType of valid input types
var ValidType = map[string]struct{}{
StdinType: {},
LogType: {},
RedisType: {},
UdpType: {},
}

// MatchAny checks if the text matches any of the regular expressions
Expand Down
3 changes: 3 additions & 0 deletions filebeat/prospector/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/elastic/beats/filebeat/prospector/log"
"github.com/elastic/beats/filebeat/prospector/redis"
"github.com/elastic/beats/filebeat/prospector/stdin"
"github.com/elastic/beats/filebeat/prospector/udp"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)
Expand Down Expand Up @@ -77,6 +78,8 @@ func (p *Prospector) initProspectorer(outlet channel.Outleter, states []file.Sta
prospectorer, err = redis.NewProspector(config, outlet)
case harvester.LogType:
prospectorer, err = log.NewProspector(config, states, outlet, p.done)
case harvester.UdpType:
prospectorer, err = udp.NewProspector(config, outlet)
default:
return fmt.Errorf("invalid prospector type: %v. Change type", p.config.Type)
}
Expand Down
20 changes: 20 additions & 0 deletions filebeat/prospector/udp/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package udp

import (
"github.com/elastic/beats/filebeat/harvester"
)

var defaultConfig = config{
ForwarderConfig: harvester.ForwarderConfig{
Type: "udp",
},
MaxMessageSize: 10240,
// TODO: What should be default port?
Host: "localhost:8080",
}

type config struct {
harvester.ForwarderConfig `config:",inline"`
Host string `config:"host"`
MaxMessageSize int `config:"max_message_size"`
}
72 changes: 72 additions & 0 deletions filebeat/prospector/udp/harvester.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package udp

import (
"net"
"time"

"github.com/elastic/beats/filebeat/harvester"
"github.com/elastic/beats/filebeat/util"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)

type Harvester struct {
forwarder *harvester.Forwarder
done chan struct{}
cfg *common.Config
listener net.PacketConn
}

func NewHarvester(forwarder *harvester.Forwarder, cfg *common.Config) *Harvester {
return &Harvester{
done: make(chan struct{}),
cfg: cfg,
forwarder: forwarder,
}
}

func (h *Harvester) Run() error {

config := defaultConfig
err := h.cfg.Unpack(&config)
if err != nil {
return err
}

h.listener, err = net.ListenPacket("udp", config.Host)
if err != nil {
return err
}
defer h.listener.Close()

logp.Info("Started listening for udp on: %s", config.Host)

buffer := make([]byte, config.MaxMessageSize)

for {
select {
case <-h.done:
return nil
default:
}

length, _, err := h.listener.ReadFrom(buffer)
if err != nil {
logp.Err("Error reading from buffer: %v", err.Error())
continue
}
data := util.NewData()
event := common.MapStr{
"message": string(buffer[:length]),
"@timestamp": time.Now(),
}
data.Event = event
h.forwarder.Send(data)
}
}

func (h *Harvester) Stop() {
logp.Info("Stopping udp harvester")
close(h.done)
h.listener.Close()
}
47 changes: 47 additions & 0 deletions filebeat/prospector/udp/prospector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package udp

import (
"github.com/elastic/beats/filebeat/channel"
"github.com/elastic/beats/filebeat/harvester"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)

type Prospector struct {
harvester *Harvester
started bool
}

func NewProspector(cfg *common.Config, outlet channel.Outleter) (*Prospector, error) {
forwarder, err := harvester.NewForwarder(cfg, outlet)
if err != nil {
return nil, err
}
return &Prospector{
harvester: NewHarvester(forwarder, cfg),
started: false,
}, nil
}

func (p *Prospector) Run() {
logp.Info("Starting udp prospector")

if !p.started {
p.started = true
go func() {
err := p.harvester.Run()
if err != nil {
logp.Err("Error running harvester:: %v", err)
}
}()
}
}

func (p *Prospector) Stop() {
logp.Info("Stopping udp prospector")
p.harvester.Stop()
}

func (p *Prospector) Wait() {
p.Stop()
}
41 changes: 41 additions & 0 deletions filebeat/tests/system/test_udp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from filebeat import BaseTest
import socket
import time


class Test(BaseTest):

def test_udp(self):

host = "127.0.0.1"
port = 8080
prospector_raw = """
- type: udp
host: "{}:{}"
enabled: true
"""

prospector_raw = prospector_raw.format(host, port)

self.render_config_template(
prospector_raw=prospector_raw,
prospectors=False,
)

filebeat = self.start_beat()

self.wait_until(lambda: self.log_contains("Started listening for udp"))

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP

for n in range(0, 2):
sock.sendto("Hello World: " + str(n), (host, port))

self.wait_until(lambda: self.output_count(lambda x: x >= 2))

filebeat.check_kill_and_wait()

output = self.read_output()

assert len(output) == 2
assert output[0]["prospector.type"] == "udp"