diff --git a/filebeat/publish/publish.go b/filebeat/publish/publish.go index 395483d4dab..ae6531ef8e1 100644 --- a/filebeat/publish/publish.go +++ b/filebeat/publish/publish.go @@ -18,7 +18,7 @@ type LogPublisher interface { } type syncLogPublisher struct { - pub *publisher.Publisher + pub publisher.Publisher client publisher.Client in, out chan []*input.FileEvent @@ -27,7 +27,7 @@ type syncLogPublisher struct { } type asyncLogPublisher struct { - pub *publisher.Publisher + pub publisher.Publisher client publisher.Client in, out chan []*input.FileEvent @@ -71,7 +71,7 @@ var ( func New( async bool, in, out chan []*input.FileEvent, - pub *publisher.Publisher, + pub publisher.Publisher, ) LogPublisher { if async { return newAsyncLogPublisher(in, out, pub) @@ -81,7 +81,7 @@ func New( func newSyncLogPublisher( in, out chan []*input.FileEvent, - pub *publisher.Publisher, + pub publisher.Publisher, ) *syncLogPublisher { return &syncLogPublisher{ in: in, @@ -138,7 +138,7 @@ func (p *syncLogPublisher) Stop() { func newAsyncLogPublisher( in, out chan []*input.FileEvent, - pub *publisher.Publisher, + pub publisher.Publisher, ) *asyncLogPublisher { return &asyncLogPublisher{ in: in, diff --git a/filebeat/publish/publish_test.go b/filebeat/publish/publish_test.go index f0ad101ec9c..a55203a2cd1 100644 --- a/filebeat/publish/publish_test.go +++ b/filebeat/publish/publish_test.go @@ -47,7 +47,7 @@ func TestPublisherModes(t *testing.T) { regChan := make(chan []*input.FileEvent, len(test.order)+1) client := pubtest.NewChanClient(0) - pub := New(test.async, pubChan, regChan, client) + pub := New(test.async, pubChan, regChan, pubtest.PublisherWithClient(client)) pub.Start() var events [][]*input.FileEvent diff --git a/libbeat/beat/beat.go b/libbeat/beat/beat.go index b60a79ca383..2a0f54baaae 100644 --- a/libbeat/beat/beat.go +++ b/libbeat/beat/beat.go @@ -80,12 +80,12 @@ type Creator func(*Beat, *common.Config) (Beater, error) // Beat contains the basic beat data and the publisher client used to publish // events. type Beat struct { - Name string // Beat name. - Version string // Beat version number. Defaults to the libbeat version when an implementation does not set a version. - UUID uuid.UUID // ID assigned to a Beat instance. - RawConfig *common.Config // Raw config that can be unpacked to get Beat specific config data. - Config BeatConfig // Common Beat configuration data. - Publisher *publisher.Publisher // Publisher + Name string // Beat name. + Version string // Beat version number. Defaults to the libbeat version when an implementation does not set a version. + UUID uuid.UUID // ID assigned to a Beat instance. + RawConfig *common.Config // Raw config that can be unpacked to get Beat specific config data. + Config BeatConfig // Common Beat configuration data. + Publisher publisher.Publisher // Publisher } // BeatConfig struct contains the basic configuration of every beat diff --git a/libbeat/publisher/async.go b/libbeat/publisher/async.go index 55d77836fdf..71a8d1d6bc8 100644 --- a/libbeat/publisher/async.go +++ b/libbeat/publisher/async.go @@ -7,7 +7,7 @@ import ( type asyncPipeline struct { outputs []worker - pub *Publisher + pub *BeatPublisher } const ( @@ -15,7 +15,7 @@ const ( ) func newAsyncPipeline( - pub *Publisher, + pub *BeatPublisher, hwm, bulkHWM int, ws *workerSignal, ) *asyncPipeline { diff --git a/libbeat/publisher/client.go b/libbeat/publisher/client.go index c33e23c86ac..2eec69bd225 100644 --- a/libbeat/publisher/client.go +++ b/libbeat/publisher/client.go @@ -62,12 +62,12 @@ type Client interface { type client struct { canceler *op.Canceler - publisher *Publisher + publisher *BeatPublisher beatMeta common.MapStr // Beat metadata that is added to all events. globalEventMetadata common.EventMetadata // Fields and tags that are added to all events. } -func newClient(pub *Publisher) *client { +func newClient(pub *BeatPublisher) *client { c := &client{ canceler: op.NewCanceler(), diff --git a/libbeat/publisher/client_test.go b/libbeat/publisher/client_test.go index 0b12d222f19..86fa9ce13ec 100644 --- a/libbeat/publisher/client_test.go +++ b/libbeat/publisher/client_test.go @@ -13,7 +13,7 @@ import ( // ClientOptions. func TestGetClient(t *testing.T) { c := &client{ - publisher: &Publisher{}, + publisher: &BeatPublisher{}, } c.publisher.pipelines.async = &asyncPipeline{} c.publisher.pipelines.sync = &syncPipeline{} diff --git a/libbeat/publisher/common_test.go b/libbeat/publisher/common_test.go index 20695a58217..911c9a23a92 100644 --- a/libbeat/publisher/common_test.go +++ b/libbeat/publisher/common_test.go @@ -129,7 +129,7 @@ func testEvent() common.MapStr { } type testPublisher struct { - pub *Publisher + pub *BeatPublisher outputMsgHandler *testMessageHandler client *client } @@ -147,7 +147,7 @@ const ( ) func newTestPublisher(bulkSize int, response OutputResponse) *testPublisher { - pub := &Publisher{} + pub := &BeatPublisher{} pub.wsOutput.Init() pub.wsPublisher.Init() diff --git a/libbeat/publisher/publish.go b/libbeat/publisher/publish.go index 72d7bf1b7e2..9a3da46bb35 100644 --- a/libbeat/publisher/publish.go +++ b/libbeat/publisher/publish.go @@ -46,7 +46,11 @@ type TransactionalEventPublisher interface { PublishTransaction(transaction op.Signaler, events []common.MapStr) } -type Publisher struct { +type Publisher interface { + Connect() Client +} + +type BeatPublisher struct { shipperName string // Shipper name as set in the configuration file hostname string // Host name as returned by the operation system name string // The shipperName if configured, the hostname otherwise @@ -55,8 +59,8 @@ type Publisher struct { Index string Output []*outputWorker TopologyOutput outputs.TopologyOutputer - IgnoreOutgoing bool - GeoLite *libgeo.GeoIP + ignoreOutgoing bool + geoLite *libgeo.GeoIP Processors *processors.Processors globalEventMetadata common.EventMetadata // Fields and tags to add to each event. @@ -107,7 +111,7 @@ func init() { publishDisabled = flag.Bool("N", false, "Disable actual publishing for testing") } -func (publisher *Publisher) IsPublisherIP(ip string) bool { +func (publisher *BeatPublisher) IsPublisherIP(ip string) bool { for _, myip := range publisher.IpAddrs { if myip == ip { return true @@ -117,7 +121,7 @@ func (publisher *Publisher) IsPublisherIP(ip string) bool { return false } -func (publisher *Publisher) GetServerName(ip string) string { +func (publisher *BeatPublisher) GetServerName(ip string) string { // in case the IP is localhost, return current shipper name islocal, err := common.IsLoopback(ip) if err != nil { @@ -137,18 +141,26 @@ func (publisher *Publisher) GetServerName(ip string) string { return "" } -func (publisher *Publisher) Connect() Client { +func (publisher *BeatPublisher) GeoLite() *libgeo.GeoIP { + return publisher.geoLite +} + +func (publisher *BeatPublisher) IgnoreOutgoing() bool { + return publisher.ignoreOutgoing +} + +func (publisher *BeatPublisher) Connect() Client { atomic.AddUint32(&publisher.numClients, 1) return newClient(publisher) } -func (publisher *Publisher) UpdateTopologyPeriodically() { +func (publisher *BeatPublisher) UpdateTopologyPeriodically() { for range publisher.RefreshTopologyTimer { _ = publisher.PublishTopology() // ignore errors } } -func (publisher *Publisher) PublishTopology(params ...string) error { +func (publisher *BeatPublisher) PublishTopology(params ...string) error { localAddrs := params if len(params) == 0 { @@ -178,9 +190,9 @@ func New( configs map[string]*common.Config, shipper ShipperConfig, processors *processors.Processors, -) (*Publisher, error) { +) (*BeatPublisher, error) { - publisher := Publisher{} + publisher := BeatPublisher{} err := publisher.init(beatName, configs, shipper, processors) if err != nil { return nil, err @@ -188,14 +200,14 @@ func New( return &publisher, nil } -func (publisher *Publisher) init( +func (publisher *BeatPublisher) init( beatName string, configs map[string]*common.Config, shipper ShipperConfig, processors *processors.Processors, ) error { var err error - publisher.IgnoreOutgoing = shipper.Ignore_outgoing + publisher.ignoreOutgoing = shipper.Ignore_outgoing publisher.Processors = processors publisher.disabled = *publishDisabled @@ -213,7 +225,7 @@ func (publisher *Publisher) init( bulkHWM = *shipper.BulkQueueSize } - publisher.GeoLite = common.LoadGeoIPData(shipper.Geoip) + publisher.geoLite = common.LoadGeoIPData(shipper.Geoip) publisher.wsPublisher.Init() publisher.wsOutput.Init() @@ -321,7 +333,7 @@ func (publisher *Publisher) init( return nil } -func (publisher *Publisher) Stop() { +func (publisher *BeatPublisher) Stop() { if atomic.LoadUint32(&publisher.numClients) > 0 { panic("All clients must disconnect before shutting down publisher pipeline") } diff --git a/libbeat/publisher/publish_test.go b/libbeat/publisher/publish_test.go index a76485ff25a..e05c81138a3 100644 --- a/libbeat/publisher/publish_test.go +++ b/libbeat/publisher/publish_test.go @@ -37,7 +37,7 @@ func (topo testTopology) GetNameByIP(ip string) string { // Test GetServerName. func TestPublisherTypeGetServerName(t *testing.T) { - pt := &Publisher{name: shipperName} + pt := &BeatPublisher{name: shipperName} assert.Equal(t, shipperName, pt.GetServerName("127.0.0.1")) // Unknown hosts return empty string. @@ -57,7 +57,7 @@ func TestPublisherTypeUpdateTopologyPeriodically(t *testing.T) { publishName: make(chan string, 1), publishLocalAddrs: make(chan []string, 1), } - pt := &Publisher{ + pt := &BeatPublisher{ name: shipperName, RefreshTopologyTimer: c, TopologyOutput: topo, diff --git a/libbeat/publisher/sync.go b/libbeat/publisher/sync.go index a4f84b55956..b72668c8307 100644 --- a/libbeat/publisher/sync.go +++ b/libbeat/publisher/sync.go @@ -3,10 +3,10 @@ package publisher import "github.com/elastic/beats/libbeat/common/op" type syncPipeline struct { - pub *Publisher + pub *BeatPublisher } -func newSyncPipeline(pub *Publisher, hwm, bulkHWM int) *syncPipeline { +func newSyncPipeline(pub *BeatPublisher, hwm, bulkHWM int) *syncPipeline { return &syncPipeline{pub: pub} } diff --git a/libbeat/publisher/testing/testing.go b/libbeat/publisher/testing/testing.go index 1f0d7b94656..8cc89275c56 100644 --- a/libbeat/publisher/testing/testing.go +++ b/libbeat/publisher/testing/testing.go @@ -6,6 +6,10 @@ import ( "github.com/elastic/beats/libbeat/publisher" ) +type TestPublisher struct { + client publisher.Client +} + // given channel only. type ChanClient struct { done chan struct{} @@ -19,6 +23,14 @@ type PublishMessage struct { Events []common.MapStr } +func PublisherWithClient(client publisher.Client) publisher.Publisher { + return &TestPublisher{client} +} + +func (pub *TestPublisher) Connect() publisher.Client { + return pub.client +} + func NewChanClient(bufSize int) *ChanClient { return NewChanClientWith(make(chan PublishMessage, bufSize)) } diff --git a/packetbeat/beater/packetbeat.go b/packetbeat/beater/packetbeat.go index 8857b7c8c19..f143fcfe11d 100644 --- a/packetbeat/beater/packetbeat.go +++ b/packetbeat/beater/packetbeat.go @@ -99,8 +99,8 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { func (pb *Packetbeat) init(b *beat.Beat) error { cfg := &pb.Config.Packetbeat - - if err := procs.ProcWatcher.Init(cfg.Procs); err != nil { + err := procs.ProcWatcher.Init(cfg.Procs) + if err != nil { logp.Critical(err.Error()) return err } @@ -113,16 +113,20 @@ func (pb *Packetbeat) init(b *beat.Beat) error { if b.Config.Shipper.BulkQueueSize != nil { bulkQueueSize = *b.Config.Shipper.BulkQueueSize } - pb.Pub = publish.NewPublisher(b.Publisher, queueSize, bulkQueueSize) + pb.Pub, err = publish.NewPublisher(b.Publisher, queueSize, bulkQueueSize) + if err != nil { + return fmt.Errorf("Initializing publisher failed: %v", err) + } logp.Debug("main", "Initializing protocol plugins") - err := protos.Protos.Init(false, pb.Pub, cfg.Protocols) + err = protos.Protos.Init(false, pb.Pub, cfg.Protocols) if err != nil { return fmt.Errorf("Initializing protocol analyzers failed: %v", err) } logp.Debug("main", "Initializing sniffer") - if err := pb.setupSniffer(); err != nil { + err = pb.setupSniffer() + if err != nil { return fmt.Errorf("Initializing sniffer failed: %v", err) } diff --git a/packetbeat/publish/publish.go b/packetbeat/publish/publish.go index 8bdb2c7c53f..151b8f3613f 100644 --- a/packetbeat/publish/publish.go +++ b/packetbeat/publish/publish.go @@ -8,6 +8,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/publisher" + "github.com/nranchev/go-libGeoIP" ) type Transactions interface { @@ -19,9 +20,13 @@ type Flows interface { } type PacketbeatPublisher struct { - pub *publisher.Publisher + pub publisher.Publisher client publisher.Client + topo TopologyProvider + geoLite *libgeo.GeoIP + ignoreOutgoing bool + wg sync.WaitGroup done chan struct{} @@ -33,6 +38,16 @@ type ChanTransactions struct { Channel chan common.MapStr } +// XXX: currently implemented by libbeat publisher. This functionality is only +// required by packetbeat. Source for TopologyProvider should become local to +// packetbeat. +type TopologyProvider interface { + IsPublisherIP(ip string) bool + GetServerName(ip string) string + GeoLite() *libgeo.GeoIP + IgnoreOutgoing() bool +} + func (t *ChanTransactions) PublishTransaction(event common.MapStr) bool { t.Channel <- event return true @@ -40,14 +55,25 @@ func (t *ChanTransactions) PublishTransaction(event common.MapStr) bool { var debugf = logp.MakeDebug("publish") -func NewPublisher(pub *publisher.Publisher, hwm, bulkHWM int) *PacketbeatPublisher { - return &PacketbeatPublisher{ - pub: pub, - client: pub.Connect(), - done: make(chan struct{}), - trans: make(chan common.MapStr, hwm), - flows: make(chan []common.MapStr, bulkHWM), +func NewPublisher( + pub publisher.Publisher, + hwm, bulkHWM int, +) (*PacketbeatPublisher, error) { + topo, ok := pub.(TopologyProvider) + if !ok { + return nil, errors.New("Requires topology provider") } + + return &PacketbeatPublisher{ + pub: pub, + topo: topo, + geoLite: topo.GeoLite(), + ignoreOutgoing: topo.IgnoreOutgoing(), + client: pub.Connect(), + done: make(chan struct{}), + trans: make(chan common.MapStr, hwm), + flows: make(chan []common.MapStr, bulkHWM), + }, nil } func (t *PacketbeatPublisher) PublishTransaction(event common.MapStr) bool { @@ -110,7 +136,7 @@ func (t *PacketbeatPublisher) onTransaction(event common.MapStr) { return } - if !normalizeTransAddr(t.pub, event) { + if !t.normalizeTransAddr(event) { return } @@ -125,7 +151,7 @@ func (t *PacketbeatPublisher) onFlow(events []common.MapStr) { continue } - if !addGeoIPToFlow(t.pub, event) { + if !t.addGeoIPToFlow(event) { continue } @@ -161,7 +187,7 @@ func validateEvent(event common.MapStr) error { return nil } -func normalizeTransAddr(pub *publisher.Publisher, event common.MapStr) bool { +func (p *PacketbeatPublisher) normalizeTransAddr(event common.MapStr) bool { debugf("normalize address for: %v", event) var srcServer, dstServer string @@ -169,9 +195,9 @@ func normalizeTransAddr(pub *publisher.Publisher, event common.MapStr) bool { debugf("has src: %v", ok) if ok { // check if it's outgoing transaction (as client) - isOutgoing := pub.IsPublisherIP(src.Ip) + isOutgoing := p.topo.IsPublisherIP(src.Ip) if isOutgoing { - if pub.IgnoreOutgoing { + if p.ignoreOutgoing { // duplicated transaction -> ignore it debugf("Ignore duplicated transaction on: %s -> %s", srcServer, dstServer) return false @@ -181,7 +207,7 @@ func normalizeTransAddr(pub *publisher.Publisher, event common.MapStr) bool { event["direction"] = "out" } - srcServer = pub.GetServerName(src.Ip) + srcServer = p.topo.GetServerName(src.Ip) event["client_ip"] = src.Ip event["client_port"] = src.Port event["client_proc"] = src.Proc @@ -192,7 +218,7 @@ func normalizeTransAddr(pub *publisher.Publisher, event common.MapStr) bool { dst, ok := event["dst"].(*common.Endpoint) debugf("has dst: %v", ok) if ok { - dstServer = pub.GetServerName(dst.Ip) + dstServer = p.topo.GetServerName(dst.Ip) event["ip"] = dst.Ip event["port"] = dst.Port event["proc"] = dst.Proc @@ -200,24 +226,24 @@ func normalizeTransAddr(pub *publisher.Publisher, event common.MapStr) bool { delete(event, "dst") //check if it's incoming transaction (as server) - if pub.IsPublisherIP(dst.Ip) { + if p.topo.IsPublisherIP(dst.Ip) { // incoming transaction event["direction"] = "in" } } - if pub.GeoLite != nil { + if p.geoLite != nil { realIP, exists := event["real_ip"] if exists && len(realIP.(common.NetString)) > 0 { - loc := pub.GeoLite.GetLocationByIP(string(realIP.(common.NetString))) + loc := p.geoLite.GetLocationByIP(string(realIP.(common.NetString))) if loc != nil && loc.Latitude != 0 && loc.Longitude != 0 { loc := fmt.Sprintf("%f, %f", loc.Latitude, loc.Longitude) event["client_location"] = loc } } else { if len(srcServer) == 0 && src != nil { // only for external IP addresses - loc := pub.GeoLite.GetLocationByIP(src.Ip) + loc := p.geoLite.GetLocationByIP(src.Ip) if loc != nil && loc.Latitude != 0 && loc.Longitude != 0 { loc := fmt.Sprintf("%f, %f", loc.Latitude, loc.Longitude) event["client_location"] = loc @@ -229,7 +255,7 @@ func normalizeTransAddr(pub *publisher.Publisher, event common.MapStr) bool { return true } -func addGeoIPToFlow(pub *publisher.Publisher, event common.MapStr) bool { +func (p *PacketbeatPublisher) addGeoIPToFlow(event common.MapStr) bool { getLocation := func(host common.MapStr, ip_type string) string { @@ -243,7 +269,7 @@ func addGeoIPToFlow(pub *publisher.Publisher, event common.MapStr) bool { logp.Warn("IP address must be string") return "" } - loc := pub.GeoLite.GetLocationByIP(str) + loc := p.geoLite.GetLocationByIP(str) if loc == nil || loc.Latitude == 0 || loc.Longitude == 0 { return "" } @@ -251,7 +277,7 @@ func addGeoIPToFlow(pub *publisher.Publisher, event common.MapStr) bool { return fmt.Sprintf("%f, %f", loc.Latitude, loc.Longitude) } - if pub.GeoLite == nil { + if p.geoLite == nil { return true } diff --git a/packetbeat/publish/publish_test.go b/packetbeat/publish/publish_test.go index dcf349387e5..cd2d2c2ebae 100644 --- a/packetbeat/publish/publish_test.go +++ b/packetbeat/publish/publish_test.go @@ -63,6 +63,7 @@ func TestFilterEvent(t *testing.T) { func TestDirectionOut(t *testing.T) { publisher := newTestPublisher([]string{"192.145.2.4"}) + ppub, _ := NewPublisher(publisher, 1000, 1) event := common.MapStr{ "src": &common.Endpoint{ @@ -81,13 +82,14 @@ func TestDirectionOut(t *testing.T) { }, } - assert.True(t, normalizeTransAddr(publisher, event)) + assert.True(t, ppub.normalizeTransAddr(event)) assert.True(t, event["client_ip"] == "192.145.2.4") assert.True(t, event["direction"] == "out") } func TestDirectionIn(t *testing.T) { publisher := newTestPublisher([]string{"192.145.2.5"}) + ppub, _ := NewPublisher(publisher, 1000, 1) event := common.MapStr{ "src": &common.Endpoint{ @@ -106,19 +108,20 @@ func TestDirectionIn(t *testing.T) { }, } - assert.True(t, normalizeTransAddr(publisher, event)) + assert.True(t, ppub.normalizeTransAddr(event)) assert.True(t, event["client_ip"] == "192.145.2.4") assert.True(t, event["direction"] == "in") } -func newTestPublisher(ips []string) *publisher.Publisher { - p := &publisher.Publisher{} +func newTestPublisher(ips []string) *publisher.BeatPublisher { + p := &publisher.BeatPublisher{} p.IpAddrs = ips return p } func TestNoDirection(t *testing.T) { publisher := newTestPublisher([]string{"192.145.2.6"}) + ppub, _ := NewPublisher(publisher, 1000, 1) event := common.MapStr{ "src": &common.Endpoint{ @@ -137,7 +140,7 @@ func TestNoDirection(t *testing.T) { }, } - assert.True(t, normalizeTransAddr(publisher, event)) + assert.True(t, ppub.normalizeTransAddr(event)) assert.True(t, event["client_ip"] == "192.145.2.4") _, ok := event["direction"] assert.False(t, ok)