From 17c12ea715307a2eb1e5af5234001ec58ded6eaf Mon Sep 17 00:00:00 2001 From: urso Date: Tue, 19 Jan 2016 20:57:09 +0100 Subject: [PATCH] move preprocessor from libbeat and move packetbeat - remove preprocessor worker completely from libbeat - introduce transaction publisher in packetbeat to event processing: - GeoIP - normalize addresses - simplify sync/async publisher client - update new_protocol docs and changelog --- CHANGELOG.asciidoc | 1 + filebeat/input/file.go | 1 + libbeat/beat/beat.go | 18 +- libbeat/publisher/async.go | 36 ++-- libbeat/publisher/client.go | 32 +++ libbeat/publisher/preprocess.go | 195 ------------------ libbeat/publisher/publish.go | 19 +- libbeat/publisher/publish_test.go | 6 - libbeat/publisher/sync.go | 56 ++--- libbeat/publisher/sync_test.go | 5 + packetbeat/beat/packetbeat.go | 21 +- packetbeat/docs/new_protocol.asciidoc | 2 +- packetbeat/protos/dns/dns.go | 8 +- packetbeat/protos/dns/dns_tcp_test.go | 16 +- packetbeat/protos/dns/dns_test.go | 6 +- packetbeat/protos/dns/dns_udp_test.go | 10 +- packetbeat/protos/http/http.go | 8 +- packetbeat/protos/http/http_test.go | 8 +- packetbeat/protos/icmp/icmp.go | 8 +- packetbeat/protos/icmp/icmp_test.go | 6 +- packetbeat/protos/memcache/memcache.go | 10 +- packetbeat/protos/mongodb/mongodb.go | 8 +- packetbeat/protos/mongodb/mongodb_test.go | 6 +- packetbeat/protos/mysql/mysql.go | 8 +- packetbeat/protos/mysql/mysql_test.go | 6 +- packetbeat/protos/pgsql/pgsql.go | 8 +- packetbeat/protos/pgsql/pgsql_test.go | 6 +- packetbeat/protos/protos.go | 4 +- packetbeat/protos/protos_test.go | 8 +- packetbeat/protos/redis/redis.go | 8 +- packetbeat/protos/tcp/tcp_test.go | 8 +- packetbeat/protos/thrift/thrift.go | 8 +- packetbeat/protos/udp/udp_test.go | 4 +- packetbeat/publish/publish.go | 187 +++++++++++++++++ .../publish/publish_test.go | 46 +++-- .../tests/system/test_0013_redis_basic.py | 2 +- topbeat/beat/topbeat.go | 3 + winlogbeat/eventlog/eventlog.go | 1 + 38 files changed, 431 insertions(+), 362 deletions(-) delete mode 100644 libbeat/publisher/preprocess.go create mode 100644 packetbeat/publish/publish.go rename libbeat/publisher/preprocess_test.go => packetbeat/publish/publish_test.go (70%) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 13194fc9085..242bc07e160 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -15,6 +15,7 @@ https://github.com/elastic/beats/compare/1.0.0...master[Check the HEAD diff] *Affecting all Beats* - Some publisher options refactoring in libbeat {pull}684[684] - Run function to start a beat no returns an error instead of directly exiting. {pull}771[771] +- Move event preprocessor applying GeoIP to packetbeat {pull}772[772] *Packetbeat* diff --git a/filebeat/input/file.go b/filebeat/input/file.go index 5f5b84288a5..0f0f3c0679d 100644 --- a/filebeat/input/file.go +++ b/filebeat/input/file.go @@ -72,6 +72,7 @@ func (f *FileEvent) ToMapStr() common.MapStr { "message": f.Text, "type": f.DocumentType, "input_type": f.InputType, + "count": 1, } if f.Fields != nil { diff --git a/libbeat/beat/beat.go b/libbeat/beat/beat.go index 7dde956ebb4..e0e15264e4e 100644 --- a/libbeat/beat/beat.go +++ b/libbeat/beat/beat.go @@ -28,13 +28,13 @@ import ( "runtime" "sync" + "github.com/satori/go.uuid" + "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs" "github.com/elastic/beats/libbeat/publisher" "github.com/elastic/beats/libbeat/service" - - "github.com/satori/go.uuid" ) // Beater interface that every beat must use @@ -56,12 +56,13 @@ type FlagsHandler interface { // Basic beat information type Beat struct { - Name string - Version string - Config *BeatConfig - BT Beater - Events publisher.Client - UUID uuid.UUID + Name string + Version string + Config *BeatConfig + BT Beater + Publisher *publisher.PublisherType + Events publisher.Client + UUID uuid.UUID exit chan struct{} error error @@ -212,6 +213,7 @@ func (b *Beat) LoadConfig() error { return fmt.Errorf("error Initialising publisher: %v\n", err) } + b.Publisher = pub b.Events = pub.Client() logp.Info("Init Beat: %s; Version: %s", b.Name, b.Version) diff --git a/libbeat/publisher/async.go b/libbeat/publisher/async.go index 5f1a89bb1ff..0651a99ba1a 100644 --- a/libbeat/publisher/async.go +++ b/libbeat/publisher/async.go @@ -9,7 +9,6 @@ import ( ) type asyncPublisher struct { - messageWorker outputs []worker pub *PublisherType ws workerSignal @@ -30,28 +29,12 @@ func newAsyncPublisher(pub *PublisherType, hwm, bulkHWM int) *asyncPublisher { } p.outputs = outputs - p.messageWorker.init(&pub.wsPublisher, hwm, bulkHWM, newPreprocessor(pub, p)) return p } // onStop will send stop signal to message batching workers func (p *asyncPublisher) onStop() { p.ws.stop() } -func (p *asyncPublisher) onMessage(m message) { - debug("async forward to outputers (%v)", len(p.outputs)) - - // m.signal is not set yet. But a async client type supporting signals might - // be implemented in the furute. - // If m.signal is nil, NewSplitSignaler will return nil -> signaler will - // only set if client did send one - if m.context.signal != nil && len(p.outputs) > 1 { - m.context.signal = outputs.NewSplitSignaler(m.context.signal, len(p.outputs)) - } - for _, o := range p.outputs { - o.send(m) - } -} - func (p *asyncPublisher) client() eventPublisher { return p } @@ -66,6 +49,25 @@ func (p *asyncPublisher) PublishEvents(ctx context, events []common.MapStr) bool return true } +func (p *asyncPublisher) send(m message) { + if p.pub.disabled { + debug("publisher disabled") + outputs.SignalCompleted(m.context.signal) + return + } + + // m.signal is not set yet. But a async client type supporting signals might + // be implemented in the future. + // If m.signal is nil, NewSplitSignaler will return nil -> signaler will + // only set if client did send one + if m.context.signal != nil && len(p.outputs) > 1 { + m.context.signal = outputs.NewSplitSignaler(m.context.signal, len(p.outputs)) + } + for _, o := range p.outputs { + o.send(m) + } +} + func asyncOutputer(ws *workerSignal, hwm, bulkHWM int, worker *outputWorker) worker { config := worker.config diff --git a/libbeat/publisher/client.go b/libbeat/publisher/client.go index 0bfcb3d2c9e..7e2d211bf61 100644 --- a/libbeat/publisher/client.go +++ b/libbeat/publisher/client.go @@ -4,6 +4,7 @@ import ( "expvar" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs" ) @@ -32,6 +33,9 @@ type ChanClient struct { type client struct { publisher *PublisherType + + beatMeta common.MapStr + tags []string } // ClientOption allows API users to set additional options when publishing events. @@ -62,18 +66,46 @@ func Signal(signaler outputs.Signaler) ClientOption { } } +func newClient(pub *PublisherType) *client { + return &client{ + publisher: pub, + beatMeta: common.MapStr{ + "name": pub.name, + "hostname": pub.hostname, + }, + tags: pub.tags, + } +} + func (c *client) PublishEvent(event common.MapStr, opts ...ClientOption) bool { + c.annotateEvent(event) + ctx, client := c.getClient(opts) publishedEvents.Add(1) return client.PublishEvent(ctx, event) } func (c *client) PublishEvents(events []common.MapStr, opts ...ClientOption) bool { + for _, event := range events { + c.annotateEvent(event) + } + ctx, client := c.getClient(opts) publishedEvents.Add(int64(len(events))) return client.PublishEvents(ctx, events) } +func (c *client) annotateEvent(event common.MapStr) { + event["beat"] = c.beatMeta + if len(c.tags) > 0 { + event["tags"] = c.tags + } + + if logp.IsDebug("publish") { + PrintPublishEvent(event) + } +} + func (c *client) getClient(opts []ClientOption) (context, eventPublisher) { var ctx context for _, opt := range opts { diff --git a/libbeat/publisher/preprocess.go b/libbeat/publisher/preprocess.go deleted file mode 100644 index 135b364b476..00000000000 --- a/libbeat/publisher/preprocess.go +++ /dev/null @@ -1,195 +0,0 @@ -package publisher - -import ( - "errors" - "fmt" - - "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/outputs" -) - -type preprocessor struct { - handler messageHandler - pub *PublisherType -} - -func newPreprocessor(p *PublisherType, h messageHandler) *preprocessor { - return &preprocessor{ - handler: h, - pub: p, - } -} - -func (p *preprocessor) onStop() { p.handler.onStop() } - -func (p *preprocessor) onMessage(m message) { - publisher := p.pub - single := false - events := m.events - if m.event != nil { - single = true - events = []common.MapStr{m.event} - } - - var ignore []int // indices of events to be removed from events - - debug("Start Preprocessing") - - for i, event := range events { - // validate some required field - if err := filterEvent(event); err != nil { - logp.Err("Publishing event failed: %v", err) - ignore = append(ignore, i) - continue - } - - // update address and geo-ip information. Ignore event - // if address is invalid or event is found to be a duplicate - ok := updateEventAddresses(publisher, event) - if !ok { - ignore = append(ignore, i) - continue - } - - // add additional Beat meta data - event["beat"] = common.MapStr{ - "name": publisher.name, - "hostname": publisher.hostname, - } - if len(publisher.tags) > 0 { - event["tags"] = publisher.tags - } - - if logp.IsDebug("publish") { - PrintPublishEvent(event) - } - } - - // return if no event is left - if len(ignore) == len(events) { - debug("no event left, complete send") - outputs.SignalCompleted(m.context.signal) - return - } - - // remove invalid events. - // TODO: is order important? Removal can be turned into O(len(ignore)) by - // copying last element into idx and doing - // events=events[:len(events)-len(ignore)] afterwards - // Alternatively filtering could be implemented like: - // https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating - for i := len(ignore) - 1; i >= 0; i-- { - idx := ignore[i] - debug("remove event[%v]", idx) - events = append(events[:idx], events[idx+1:]...) - } - - if publisher.disabled { - debug("publisher disabled") - outputs.SignalCompleted(m.context.signal) - return - } - - debug("Forward preprocessed events") - if single { - p.handler.onMessage(message{context: m.context, event: events[0]}) - } else { - p.handler.onMessage(message{context: m.context, events: events}) - } -} - -// filterEvent validates an event for common required fields with types. -// If event is to be filtered out the reason is returned as error. -func filterEvent(event common.MapStr) error { - ts, ok := event["@timestamp"] - if !ok { - return errors.New("Missing '@timestamp' field from event") - } - - _, ok = ts.(common.Time) - if !ok { - return errors.New("Invalid '@timestamp' field from event.") - } - - err := event.EnsureCountField() - if err != nil { - return err - } - - t, ok := event["type"] - if !ok { - return errors.New("Missing 'type' field from event.") - } - - _, ok = t.(string) - if !ok { - return errors.New("Invalid 'type' field from event.") - } - - return nil -} - -func updateEventAddresses(publisher *PublisherType, event common.MapStr) bool { - var srcServer, dstServer string - src, ok := event["src"].(*common.Endpoint) - if ok { - srcServer = publisher.GetServerName(src.Ip) - event["client_ip"] = src.Ip - event["client_port"] = src.Port - event["client_proc"] = src.Proc - event["client_server"] = srcServer - delete(event, "src") - - // check if it's outgoing transaction (as client) - if publisher.IsPublisherIP(src.Ip) { - //outgoing transaction - event["direction"] = "out" - } - - } - dst, ok := event["dst"].(*common.Endpoint) - if ok { - dstServer = publisher.GetServerName(dst.Ip) - event["ip"] = dst.Ip - event["port"] = dst.Port - event["proc"] = dst.Proc - event["server"] = dstServer - delete(event, "dst") - - //check if it's incoming transaction (as server) - if publisher.IsPublisherIP(dst.Ip) { - // incoming transaction - event["direction"] = "in" - } - - } - - if publisher.IgnoreOutgoing && event["direction"] == "out" { - // duplicated transaction -> ignore it - debug("Ignore duplicated transaction on %s: %s -> %s", - publisher.name, srcServer, dstServer) - return false - } - - if publisher.GeoLite != nil { - realIP, exists := event["real_ip"] - if exists && len(realIP.(common.NetString)) > 0 { - loc := publisher.GeoLite.GetLocationByIP(string(realIP.(common.NetString))) - if loc != nil && loc.Latitude != 0 && loc.Longitude != 0 { - loc := fmt.Sprintf("%f, %f", loc.Latitude, loc.Longitude) - event["client_location"] = loc - } - } else { - if len(srcServer) == 0 && src != nil { // only for external IP addresses - loc := publisher.GeoLite.GetLocationByIP(src.Ip) - if loc != nil && loc.Latitude != 0 && loc.Longitude != 0 { - loc := fmt.Sprintf("%f, %f", loc.Latitude, loc.Longitude) - event["client_location"] = loc - } - } - } - } - - return true -} diff --git a/libbeat/publisher/publish.go b/libbeat/publisher/publish.go index d01fc0580ba..5d3b034dd5f 100644 --- a/libbeat/publisher/publish.go +++ b/libbeat/publisher/publish.go @@ -49,7 +49,7 @@ type PublisherType struct { shipperName string // Shipper name as set in the configuration file hostname string // Host name as returned by the operation system name string // The shipperName if configured, the hostname otherwise - ipaddrs []string + IpAddrs []string tags []string disabled bool Index string @@ -70,6 +70,8 @@ type PublisherType struct { syncPublisher *syncPublisher asyncPublisher *asyncPublisher + + client *client } type ShipperConfig struct { @@ -109,7 +111,7 @@ func PrintPublishEvent(event common.MapStr) { } func (publisher *PublisherType) IsPublisherIP(ip string) bool { - for _, myip := range publisher.ipaddrs { + for _, myip := range publisher.IpAddrs { if myip == ip { return true } @@ -139,7 +141,7 @@ func (publisher *PublisherType) GetServerName(ip string) string { } func (publisher *PublisherType) Client() Client { - return &client{publisher} + return publisher.client } func (publisher *PublisherType) UpdateTopologyPeriodically() { @@ -230,8 +232,12 @@ func (publisher *PublisherType) init( debug("Create output worker") outputers = append(outputers, - newOutputWorker(config, output, &publisher.wsOutput, - hwm, bulkHWM)) + newOutputWorker( + config, + output, + &publisher.wsOutput, + hwm, + bulkHWM)) if !config.Save_topology { continue @@ -284,7 +290,7 @@ func (publisher *PublisherType) init( publisher.tags = shipper.Tags //Store the publisher's IP addresses - publisher.ipaddrs, err = common.LocalIpAddrsAsStrings(false) + publisher.IpAddrs, err = common.LocalIpAddrsAsStrings(false) if err != nil { logp.Err("Failed to get local IP addresses: %s", err) return err @@ -312,5 +318,6 @@ func (publisher *PublisherType) init( publisher.asyncPublisher = newAsyncPublisher(publisher, hwm, bulkHWM) publisher.syncPublisher = newSyncPublisher(publisher, hwm, bulkHWM) + publisher.client = newClient(publisher) return nil } diff --git a/libbeat/publisher/publish_test.go b/libbeat/publisher/publish_test.go index 3364b3e3cb1..9e6eb8af516 100644 --- a/libbeat/publisher/publish_test.go +++ b/libbeat/publisher/publish_test.go @@ -55,12 +55,6 @@ func TestPublisherTypeGetServerName(t *testing.T) { assert.Equal(t, hostOnNetwork, pt.GetServerName("172.0.0.1")) } -// Test the PublisherType Client() method. -func TestPublisherTypeClient(t *testing.T) { - pt := &PublisherType{} - assert.NotNil(t, pt.Client()) -} - // Test the PublisherType UpdateTopologyPeriodically() method. func TestPublisherTypeUpdateTopologyPeriodically(t *testing.T) { // Setup. diff --git a/libbeat/publisher/sync.go b/libbeat/publisher/sync.go index 400319683ea..0523e19fadf 100644 --- a/libbeat/publisher/sync.go +++ b/libbeat/publisher/sync.go @@ -6,51 +6,53 @@ import ( ) type syncPublisher struct { - messageWorker pub *PublisherType } type syncClient func(message) bool func newSyncPublisher(pub *PublisherType, hwm, bulkHWM int) *syncPublisher { - s := &syncPublisher{pub: pub} - s.messageWorker.init(&pub.wsPublisher, hwm, bulkHWM, newPreprocessor(pub, s)) - return s + return &syncPublisher{pub: pub} } func (p *syncPublisher) client() eventPublisher { - return syncClient(p.forward) + return p } -func (p *syncPublisher) onStop() {} - -func (p *syncPublisher) onMessage(m message) { - signal := outputs.NewSplitSignaler(m.context.signal, len(p.pub.Output)) - m.context.signal = signal - for _, o := range p.pub.Output { - o.send(m) - } +func (p *syncPublisher) PublishEvent(ctx context, event common.MapStr) bool { + msg := message{context: ctx, event: event} + return p.send(msg) } -func (c syncClient) PublishEvent(ctx context, event common.MapStr) bool { - return c(message{context: ctx, event: event}) +func (p *syncPublisher) PublishEvents(ctx context, events []common.MapStr) bool { + msg := message{context: ctx, events: events} + return p.send(msg) } -func (c syncClient) PublishEvents(ctx context, events []common.MapStr) bool { - return c(message{context: ctx, events: events}) -} +func (p *syncPublisher) send(m message) bool { + if p.pub.disabled { + debug("publisher disabled") + outputs.SignalCompleted(m.context.signal) + return true + } -func (p *syncPublisher) forward(m message) bool { - sync := outputs.NewSyncSignal() signal := m.context.signal - m.context.signal = sync - p.send(m) - if sync.Wait() { - outputs.SignalCompleted(signal) - return true + sync := outputs.NewSyncSignal() + if len(p.pub.Output) > 1 { + m.context.signal = outputs.NewSplitSignaler(sync, len(p.pub.Output)) + } else { + m.context.signal = sync + } + + for _, o := range p.pub.Output { + o.send(m) } - if signal != nil { + + ok := sync.Wait() + if ok { + outputs.SignalCompleted(signal) + } else if signal != nil { signal.Failed() } - return false + return ok } diff --git a/libbeat/publisher/sync_test.go b/libbeat/publisher/sync_test.go index e99cfd126f1..6511cd3ae0d 100644 --- a/libbeat/publisher/sync_test.go +++ b/libbeat/publisher/sync_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" "github.com/stretchr/testify/assert" ) @@ -63,6 +64,10 @@ func TestSyncPublishEventsFailed(t *testing.T) { // Test that PublishEvent returns true when publishing is disabled. func TestSyncPublisherDisabled(t *testing.T) { + if testing.Verbose() { + logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"*"}) + } + testPub := newTestPublisherNoBulk(FailedResponse) testPub.pub.disabled = true event := testEvent() diff --git a/packetbeat/beat/packetbeat.go b/packetbeat/beat/packetbeat.go index 5388d03e22a..1a9933f8698 100644 --- a/packetbeat/beat/packetbeat.go +++ b/packetbeat/beat/packetbeat.go @@ -27,6 +27,7 @@ import ( "github.com/elastic/beats/packetbeat/protos/tcp" "github.com/elastic/beats/packetbeat/protos/thrift" "github.com/elastic/beats/packetbeat/protos/udp" + "github.com/elastic/beats/packetbeat/publish" "github.com/elastic/beats/packetbeat/sniffer" ) @@ -45,6 +46,7 @@ var EnabledProtocolPlugins map[protos.Protocol]protos.ProtocolPlugin = map[proto type Packetbeat struct { PbConfig config.Config CmdLineArgs CmdLineArgs + Pub *publish.PacketbeatPublisher Sniff *sniffer.SnifferSetup over chan bool } @@ -61,6 +63,10 @@ type CmdLineArgs struct { var cmdLineArgs CmdLineArgs +const ( + defaultQueueSize = 2048 +) + func init() { cmdLineArgs = CmdLineArgs{ File: flag.String("I", "", "file"), @@ -144,9 +150,16 @@ func (pb *Packetbeat) Setup(b *beat.Beat) error { pb.Sniff = new(sniffer.SnifferSetup) + queueSize := defaultQueueSize + if pb.PbConfig.Shipper.QueueSize != nil { + queueSize = *pb.PbConfig.Shipper.QueueSize + } + pb.Pub = publish.NewPublisher(b.Publisher, queueSize) + pb.Pub.Start() + logp.Debug("main", "Initializing protocol plugins") for proto, plugin := range EnabledProtocolPlugins { - err := plugin.Init(false, b.Events) + err := plugin.Init(false, pb.Pub) if err != nil { logp.Critical("Initializing plugin %s failed: %v", proto, err) os.Exit(1) @@ -156,7 +169,7 @@ func (pb *Packetbeat) Setup(b *beat.Beat) error { var err error - icmpProc, err := icmp.NewIcmp(false, b.Events) + icmpProc, err := icmp.NewIcmp(false, pb.Pub) if err != nil { logp.Critical(err.Error()) os.Exit(1) @@ -232,6 +245,10 @@ func (pb *Packetbeat) Cleanup(b *beat.Beat) error { time.Sleep(time.Duration(float64(protos.DefaultTransactionExpiration) * 1.2)) logp.Debug("main", "Streams and transactions should all be expired now.") } + + // TODO: + // pb.TransPub.Stop() + return nil } diff --git a/packetbeat/docs/new_protocol.asciidoc b/packetbeat/docs/new_protocol.asciidoc index 75bccf2caa1..e58845be0e7 100644 --- a/packetbeat/docs/new_protocol.asciidoc +++ b/packetbeat/docs/new_protocol.asciidoc @@ -100,7 +100,7 @@ All protocol modules implement the `TcpProtocolPlugin` or the // Functions to be exported by a protocol plugin type ProtocolPlugin interface { // Called to initialize the Plugin - Init(test_mode bool, results publisher.Client) error + Init(test_mode bool, results publish.Transactions) error // Called to return the configured ports GetPorts() []int diff --git a/packetbeat/protos/dns/dns.go b/packetbeat/protos/dns/dns.go index 4c25bb901dc..aade53d6d8d 100644 --- a/packetbeat/protos/dns/dns.go +++ b/packetbeat/protos/dns/dns.go @@ -15,10 +15,10 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/publisher" "github.com/elastic/beats/packetbeat/config" "github.com/elastic/beats/packetbeat/protos" + "github.com/elastic/beats/packetbeat/publish" "github.com/tsg/gopacket" "github.com/tsg/gopacket/layers" @@ -147,7 +147,7 @@ type Dns struct { transactions *common.Cache transactionTimeout time.Duration - results publisher.Client // Channel where results are pushed. + results publish.Transactions // Channel where results are pushed. } // getTransaction returns the transaction associated with the given @@ -235,7 +235,7 @@ func (dns *Dns) setFromConfig(config config.Dns) error { return nil } -func (dns *Dns) Init(test_mode bool, results publisher.Client) error { +func (dns *Dns) Init(test_mode bool, results publish.Transactions) error { dns.initDefaults() if !test_mode { dns.setFromConfig(config.ConfigSingleton.Protocols.Dns) @@ -373,7 +373,7 @@ func (dns *Dns) publishTransaction(t *DnsTransaction) { } } - dns.results.PublishEvent(event) + dns.results.PublishTransaction(event) } func (dns *Dns) expireTransaction(t *DnsTransaction) { diff --git a/packetbeat/protos/dns/dns_tcp_test.go b/packetbeat/protos/dns/dns_tcp_test.go index db12324762e..79e535aa59e 100644 --- a/packetbeat/protos/dns/dns_tcp_test.go +++ b/packetbeat/protos/dns/dns_tcp_test.go @@ -15,9 +15,9 @@ import ( "github.com/elastic/beats/packetbeat/protos" "github.com/elastic/beats/packetbeat/protos/tcp" + "github.com/elastic/beats/packetbeat/publish" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/publisher" "github.com/stretchr/testify/assert" ) @@ -232,7 +232,7 @@ func TestParseTcp_zeroLengthMsgRequest(t *testing.T) { dns.Parse(packet, tcptuple, tcp.TcpDirectionOriginal, private) assert.Empty(t, dns.transactions.Size(), "There should be no transactions.") - client := dns.results.(publisher.ChanClient) + client := dns.results.(*publish.ChanTransactions) close(client.Channel) assert.Nil(t, <-client.Channel, "No result should have been published.") } @@ -271,7 +271,7 @@ func TestParseTcp_emptyPacket(t *testing.T) { dns.Parse(packet, tcptuple, tcp.TcpDirectionOriginal, private) assert.Empty(t, dns.transactions.Size(), "There should be no transactions.") - client := dns.results.(publisher.ChanClient) + client := dns.results.(*publish.ChanTransactions) close(client.Channel) assert.Nil(t, <-client.Channel, "No result should have been published.") } @@ -297,7 +297,7 @@ func TestParseTcp_requestPacket(t *testing.T) { dns.Parse(packet, tcptuple, tcp.TcpDirectionOriginal, private) assert.Equal(t, 1, dns.transactions.Size(), "There should be one transaction.") - client := dns.results.(publisher.ChanClient) + client := dns.results.(*publish.ChanTransactions) close(client.Channel) assert.Nil(t, <-client.Channel, "No result should have been published.") } @@ -456,7 +456,7 @@ func TestGap_requestDrop(t *testing.T) { dns.ReceivedFin(tcptuple, tcp.TcpDirectionOriginal, private) - client := dns.results.(publisher.ChanClient) + client := dns.results.(*publish.ChanTransactions) close(client.Channel) mapStr := <-client.Channel assert.Nil(t, mapStr, "No result should have been published.") @@ -506,7 +506,7 @@ func TestGapFin_validMessage(t *testing.T) { dns.ReceivedFin(tcptuple, tcp.TcpDirectionReverse, private) assert.Equal(t, 1, dns.transactions.Size(), "There should be one transaction.") - client := dns.results.(publisher.ChanClient) + client := dns.results.(*publish.ChanTransactions) close(client.Channel) mapStr := <-client.Channel assert.Nil(t, mapStr, "No result should have been published.") @@ -593,7 +593,7 @@ func benchmarkTcp(b *testing.B, q DnsTestMessage) { packet = newPacket(reverse, q.response) dns.Parse(packet, tcptuple, tcp.TcpDirectionReverse, private) - client := dns.results.(publisher.ChanClient) + client := dns.results.(*publish.ChanTransactions) <-client.Channel } } @@ -611,7 +611,7 @@ func BenchmarkParallelTcpParse(b *testing.B) { rand.Seed(22) numMessages := len(messagesTcp) dns := newDns(false) - client := dns.results.(publisher.ChanClient) + client := dns.results.(*publish.ChanTransactions) // Drain the results channel while the test is running. go func() { diff --git a/packetbeat/protos/dns/dns_test.go b/packetbeat/protos/dns/dns_test.go index 07ceeeb3448..e7de931ef4c 100644 --- a/packetbeat/protos/dns/dns_test.go +++ b/packetbeat/protos/dns/dns_test.go @@ -10,10 +10,10 @@ import ( "time" "github.com/elastic/beats/packetbeat/protos" + "github.com/elastic/beats/packetbeat/publish" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/publisher" "github.com/stretchr/testify/assert" ) @@ -60,7 +60,7 @@ func newDns(verbose bool) *Dns { } dns := &Dns{} - err := dns.Init(true, publisher.ChanClient{make(chan common.MapStr, 100)}) + err := dns.Init(true, &publish.ChanTransactions{make(chan common.MapStr, 100)}) if err != nil { return nil } @@ -84,7 +84,7 @@ func newPacket(t common.IpPortTuple, payload []byte) *protos.Packet { // expectResult returns one MapStr result from the Dns results channel. If // no result is available then the test fails. func expectResult(t testing.TB, dns *Dns) common.MapStr { - client := dns.results.(publisher.ChanClient) + client := dns.results.(*publish.ChanTransactions) select { case result := <-client.Channel: return result diff --git a/packetbeat/protos/dns/dns_udp_test.go b/packetbeat/protos/dns/dns_udp_test.go index a6ac8749e73..93d119a8bef 100644 --- a/packetbeat/protos/dns/dns_udp_test.go +++ b/packetbeat/protos/dns/dns_udp_test.go @@ -23,9 +23,9 @@ import ( "time" "github.com/elastic/beats/packetbeat/protos" + "github.com/elastic/beats/packetbeat/publish" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/publisher" "github.com/stretchr/testify/assert" "github.com/tsg/gopacket/layers" ) @@ -194,7 +194,7 @@ func TestParseUdp_emptyPacket(t *testing.T) { packet := newPacket(forward, []byte{}) dns.ParseUdp(packet) assert.Empty(t, dns.transactions.Size(), "There should be no transactions.") - client := dns.results.(publisher.ChanClient) + client := dns.results.(*publish.ChanTransactions) close(client.Channel) assert.Nil(t, <-client.Channel, "No result should have been published.") } @@ -216,7 +216,7 @@ func TestParseUdp_requestPacket(t *testing.T) { packet := newPacket(forward, elasticA.request) dns.ParseUdp(packet) assert.Equal(t, 1, dns.transactions.Size(), "There should be one transaction.") - client := dns.results.(publisher.ChanClient) + client := dns.results.(*publish.ChanTransactions) close(client.Channel) assert.Nil(t, <-client.Channel, "No result should have been published.") } @@ -333,7 +333,7 @@ func benchmarkUdp(b *testing.B, q DnsTestMessage) { packet = newPacket(reverse, q.response) dns.ParseUdp(packet) - client := dns.results.(publisher.ChanClient) + client := dns.results.(*publish.ChanTransactions) <-client.Channel } } @@ -351,7 +351,7 @@ func BenchmarkParallelUdpParse(b *testing.B) { rand.Seed(22) numMessages := len(messages) dns := newDns(false) - client := dns.results.(publisher.ChanClient) + client := dns.results.(*publish.ChanTransactions) // Drain the results channal while the test is running. go func() { diff --git a/packetbeat/protos/http/http.go b/packetbeat/protos/http/http.go index dd727dcec21..76b7da0e17e 100644 --- a/packetbeat/protos/http/http.go +++ b/packetbeat/protos/http/http.go @@ -9,12 +9,12 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/publisher" "github.com/elastic/beats/packetbeat/config" "github.com/elastic/beats/packetbeat/procs" "github.com/elastic/beats/packetbeat/protos" "github.com/elastic/beats/packetbeat/protos/tcp" + "github.com/elastic/beats/packetbeat/publish" ) var debugf = logp.MakeDebug("http") @@ -68,7 +68,7 @@ type HTTP struct { transactionTimeout time.Duration - results publisher.Client + results publish.Transactions } var ( @@ -133,7 +133,7 @@ func (http *HTTP) GetPorts() []int { } // Init initializes the HTTP protocol analyser. -func (http *HTTP) Init(testMode bool, results publisher.Client) error { +func (http *HTTP) Init(testMode bool, results publish.Transactions) error { http.initDefaults() if !testMode { @@ -511,7 +511,7 @@ func (http *HTTP) publishTransaction(event common.MapStr) { if http.results == nil { return } - http.results.PublishEvent(event) + http.results.PublishTransaction(event) } func (http *HTTP) collectHeaders(m *message) interface{} { diff --git a/packetbeat/protos/http/http_test.go b/packetbeat/protos/http/http_test.go index 3e1f23148ea..f3c25270ce1 100644 --- a/packetbeat/protos/http/http_test.go +++ b/packetbeat/protos/http/http_test.go @@ -10,9 +10,9 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/publisher" "github.com/elastic/beats/packetbeat/config" "github.com/elastic/beats/packetbeat/protos" + "github.com/elastic/beats/packetbeat/publish" "github.com/stretchr/testify/assert" ) @@ -50,7 +50,7 @@ func (tp *testParser) parse() (*message, bool, bool) { func httpModForTests() *HTTP { var http HTTP - results := publisher.ChanClient{Channel: make(chan common.MapStr, 10)} + results := &publish.ChanTransactions{Channel: make(chan common.MapStr, 10)} http.Init(true, results) return &http } @@ -985,7 +985,7 @@ func testCreateTCPTuple() *common.TcpTuple { // Helper function to read from the Publisher Queue func expectTransaction(t *testing.T, http *HTTP) common.MapStr { - client := http.results.(publisher.ChanClient) + client := http.results.(*publish.ChanTransactions) select { case trans := <-client.Channel: return trans @@ -1202,7 +1202,7 @@ func BenchmarkHttpSimpleTransaction(b *testing.B) { req := protos.Packet{Payload: []byte(data1)} resp := protos.Packet{Payload: []byte(data2)} - client := http.results.(publisher.ChanClient) + client := http.results.(*publish.ChanTransactions) for i := 0; i < b.N; i++ { private := protos.ProtocolData(&httpConnectionData{}) diff --git a/packetbeat/protos/icmp/icmp.go b/packetbeat/protos/icmp/icmp.go index 8d8879e32c2..d1b1f6d04c7 100644 --- a/packetbeat/protos/icmp/icmp.go +++ b/packetbeat/protos/icmp/icmp.go @@ -6,10 +6,10 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/publisher" "github.com/elastic/beats/packetbeat/config" "github.com/elastic/beats/packetbeat/protos" + "github.com/elastic/beats/packetbeat/publish" "github.com/tsg/gopacket/layers" ) @@ -33,7 +33,7 @@ type Icmp struct { transactions *common.Cache transactionTimeout time.Duration - results publisher.Client + results publish.Transactions } const ( @@ -49,7 +49,7 @@ const ( orphanedResponseMsg = "Response was received without an associated request." ) -func NewIcmp(testMode bool, results publisher.Client) (*Icmp, error) { +func NewIcmp(testMode bool, results publish.Transactions) (*Icmp, error) { icmp := &Icmp{} icmp.initDefaults() @@ -316,5 +316,5 @@ func (icmp *Icmp) publishTransaction(trans *icmpTransaction) { // } } - icmp.results.PublishEvent(event) + icmp.results.PublishTransaction(event) } diff --git a/packetbeat/protos/icmp/icmp_test.go b/packetbeat/protos/icmp/icmp_test.go index 271a9ac6683..60c8f346e4f 100644 --- a/packetbeat/protos/icmp/icmp_test.go +++ b/packetbeat/protos/icmp/icmp_test.go @@ -7,9 +7,9 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/publisher" "github.com/elastic/beats/packetbeat/protos" + "github.com/elastic/beats/packetbeat/publish" "github.com/tsg/gopacket" "github.com/tsg/gopacket/layers" @@ -43,7 +43,7 @@ func BenchmarkIcmpProcessICMPv4(b *testing.B) { logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"icmp", "icmpdetailed"}) } - results := publisher.ChanClient{make(chan common.MapStr, 10)} + results := &publish.ChanTransactions{make(chan common.MapStr, 10)} icmp, err := NewIcmp(true, results) if err != nil { b.Error("Failed to create ICMP processor") @@ -62,7 +62,7 @@ func BenchmarkIcmpProcessICMPv4(b *testing.B) { icmp.ProcessICMPv4(icmpRequestData, packetRequestData) icmp.ProcessICMPv4(icmpResponseData, packetResponseData) - client := icmp.results.(publisher.ChanClient) + client := icmp.results.(*publish.ChanTransactions) <-client.Channel } } diff --git a/packetbeat/protos/memcache/memcache.go b/packetbeat/protos/memcache/memcache.go index 7329d220dde..bd0535daaea 100644 --- a/packetbeat/protos/memcache/memcache.go +++ b/packetbeat/protos/memcache/memcache.go @@ -9,17 +9,17 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/publisher" "github.com/elastic/beats/packetbeat/config" "github.com/elastic/beats/packetbeat/protos" "github.com/elastic/beats/packetbeat/protos/applayer" + "github.com/elastic/beats/packetbeat/publish" ) // memcache types type Memcache struct { Ports protos.PortsConfig - results publisher.Client + results publish.Transactions config parserConfig udpMemcache @@ -101,7 +101,7 @@ type memcacheStat struct { var debug = logp.MakeDebug("memcache") // Called to initialize the Plugin -func (mc *Memcache) Init(testMode bool, results publisher.Client) error { +func (mc *Memcache) Init(testMode bool, results publish.Transactions) error { debug("init memcache plugin") return mc.InitWithConfig( config.ConfigSingleton.Protocols.Memcache, @@ -120,7 +120,7 @@ func (mc *Memcache) InitDefaults() { func (mc *Memcache) InitWithConfig( config config.Memcache, testMode bool, - results publisher.Client, + results publish.Transactions, ) error { mc.InitDefaults() if !testMode { @@ -175,7 +175,7 @@ func (mc *Memcache) onTransaction(t *transaction) { event := common.MapStr{} t.Event(event) debug("publish event: %s", event) - mc.results.PublishEvent(event) + mc.results.PublishTransaction(event) } func newMessage(ts time.Time) *message { diff --git a/packetbeat/protos/mongodb/mongodb.go b/packetbeat/protos/mongodb/mongodb.go index e43fb6bc269..8d1d71e8e05 100644 --- a/packetbeat/protos/mongodb/mongodb.go +++ b/packetbeat/protos/mongodb/mongodb.go @@ -7,11 +7,11 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/publisher" "github.com/elastic/beats/packetbeat/config" "github.com/elastic/beats/packetbeat/procs" "github.com/elastic/beats/packetbeat/protos" "github.com/elastic/beats/packetbeat/protos/tcp" + "github.com/elastic/beats/packetbeat/publish" ) var debugf = logp.MakeDebug("mongodb") @@ -28,7 +28,7 @@ type Mongodb struct { responses *common.Cache transactionTimeout time.Duration - results publisher.Client + results publish.Transactions } type transactionKey struct { @@ -69,7 +69,7 @@ func (mongodb *Mongodb) GetPorts() []int { return mongodb.Ports } -func (mongodb *Mongodb) Init(test_mode bool, results publisher.Client) error { +func (mongodb *Mongodb) Init(test_mode bool, results publish.Transactions) error { debugf("Init a MongoDB protocol parser") mongodb.InitDefaults() @@ -426,5 +426,5 @@ func (mongodb *Mongodb) publishTransaction(t *transaction) { } } - mongodb.results.PublishEvent(event) + mongodb.results.PublishTransaction(event) } diff --git a/packetbeat/protos/mongodb/mongodb_test.go b/packetbeat/protos/mongodb/mongodb_test.go index 04dd7793320..5c677f2fa66 100644 --- a/packetbeat/protos/mongodb/mongodb_test.go +++ b/packetbeat/protos/mongodb/mongodb_test.go @@ -7,8 +7,8 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/publisher" "github.com/elastic/beats/packetbeat/protos" + "github.com/elastic/beats/packetbeat/publish" "github.com/stretchr/testify/assert" ) @@ -16,7 +16,7 @@ import ( // in tests. It publishes the transactions in the results channel. func MongodbModForTests() *Mongodb { var mongodb Mongodb - results := publisher.ChanClient{make(chan common.MapStr, 10)} + results := &publish.ChanTransactions{make(chan common.MapStr, 10)} mongodb.Init(true, results) return &mongodb } @@ -35,7 +35,7 @@ func testTcpTuple() *common.TcpTuple { // Helper function to read from the results Queue. Raises // an error if nothing is found in the queue. func expectTransaction(t *testing.T, mongodb *Mongodb) common.MapStr { - client := mongodb.results.(publisher.ChanClient) + client := mongodb.results.(*publish.ChanTransactions) select { case trans := <-client.Channel: return trans diff --git a/packetbeat/protos/mysql/mysql.go b/packetbeat/protos/mysql/mysql.go index 9ba5ff4866b..bfca6ea354d 100644 --- a/packetbeat/protos/mysql/mysql.go +++ b/packetbeat/protos/mysql/mysql.go @@ -8,12 +8,12 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/publisher" "github.com/elastic/beats/packetbeat/config" "github.com/elastic/beats/packetbeat/procs" "github.com/elastic/beats/packetbeat/protos" "github.com/elastic/beats/packetbeat/protos/tcp" + "github.com/elastic/beats/packetbeat/publish" ) // Packet types @@ -123,7 +123,7 @@ type Mysql struct { transactions *common.Cache transactionTimeout time.Duration - results publisher.Client + results publish.Transactions // function pointer for mocking handleMysql func(mysql *Mysql, m *MysqlMessage, tcp *common.TcpTuple, @@ -172,7 +172,7 @@ func (mysql *Mysql) GetPorts() []int { return mysql.Ports } -func (mysql *Mysql) Init(test_mode bool, results publisher.Client) error { +func (mysql *Mysql) Init(test_mode bool, results publish.Transactions) error { mysql.InitDefaults() if !test_mode { @@ -870,7 +870,7 @@ func (mysql *Mysql) publishTransaction(t *MysqlTransaction) { event["src"] = &t.Src event["dst"] = &t.Dst - mysql.results.PublishEvent(event) + mysql.results.PublishTransaction(event) } func read_lstring(data []byte, offset int) ([]byte, int, bool, error) { diff --git a/packetbeat/protos/mysql/mysql_test.go b/packetbeat/protos/mysql/mysql_test.go index f9868963839..0f3c7539ff3 100644 --- a/packetbeat/protos/mysql/mysql_test.go +++ b/packetbeat/protos/mysql/mysql_test.go @@ -7,17 +7,17 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/publisher" "github.com/stretchr/testify/assert" "github.com/elastic/beats/packetbeat/protos" + "github.com/elastic/beats/packetbeat/publish" "time" ) func MysqlModForTests() *Mysql { var mysql Mysql - results := publisher.ChanClient{make(chan common.MapStr, 10)} + results := &publish.ChanTransactions{make(chan common.MapStr, 10)} mysql.Init(true, results) return &mysql } @@ -464,7 +464,7 @@ func testTcpTuple() *common.TcpTuple { // Helper function to read from the Publisher Queue func expectTransaction(t *testing.T, mysql *Mysql) common.MapStr { - client := mysql.results.(publisher.ChanClient) + client := mysql.results.(*publish.ChanTransactions) select { case trans := <-client.Channel: return trans diff --git a/packetbeat/protos/pgsql/pgsql.go b/packetbeat/protos/pgsql/pgsql.go index d4cb2fdc459..b2b9a602258 100644 --- a/packetbeat/protos/pgsql/pgsql.go +++ b/packetbeat/protos/pgsql/pgsql.go @@ -6,12 +6,12 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/publisher" "github.com/elastic/beats/packetbeat/config" "github.com/elastic/beats/packetbeat/procs" "github.com/elastic/beats/packetbeat/protos" "github.com/elastic/beats/packetbeat/protos/tcp" + "github.com/elastic/beats/packetbeat/publish" ) type PgsqlMessage struct { @@ -99,7 +99,7 @@ type Pgsql struct { transactions *common.Cache transactionTimeout time.Duration - results publisher.Client + results publish.Transactions // function pointer for mocking handlePgsql func(pgsql *Pgsql, m *PgsqlMessage, tcp *common.TcpTuple, @@ -148,7 +148,7 @@ func (pgsql *Pgsql) GetPorts() []int { return pgsql.Ports } -func (pgsql *Pgsql) Init(test_mode bool, results publisher.Client) error { +func (pgsql *Pgsql) Init(test_mode bool, results publish.Transactions) error { pgsql.InitDefaults() if !test_mode { @@ -940,7 +940,7 @@ func (pgsql *Pgsql) publishTransaction(t *PgsqlTransaction) { event["notes"] = t.Notes } - pgsql.results.PublishEvent(event) + pgsql.results.PublishTransaction(event) } func (pgsql *Pgsql) removeTransaction(transList []*PgsqlTransaction, diff --git a/packetbeat/protos/pgsql/pgsql_test.go b/packetbeat/protos/pgsql/pgsql_test.go index bb702ddbaca..1c425f52dba 100644 --- a/packetbeat/protos/pgsql/pgsql_test.go +++ b/packetbeat/protos/pgsql/pgsql_test.go @@ -8,15 +8,15 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/publisher" "github.com/stretchr/testify/assert" "github.com/elastic/beats/packetbeat/protos" + "github.com/elastic/beats/packetbeat/publish" ) func PgsqlModForTests() *Pgsql { var pgsql Pgsql - results := publisher.ChanClient{make(chan common.MapStr, 10)} + results := &publish.ChanTransactions{make(chan common.MapStr, 10)} pgsql.Init(true, results) return &pgsql } @@ -293,7 +293,7 @@ func testTcpTuple() *common.TcpTuple { // Helper function to read from the Publisher Queue func expectTransaction(t *testing.T, pgsql *Pgsql) common.MapStr { - client := pgsql.results.(publisher.ChanClient) + client := pgsql.results.(*publish.ChanTransactions) select { case trans := <-client.Channel: return trans diff --git a/packetbeat/protos/protos.go b/packetbeat/protos/protos.go index b122e4eb0e9..46f49229dda 100644 --- a/packetbeat/protos/protos.go +++ b/packetbeat/protos/protos.go @@ -9,7 +9,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/publisher" + "github.com/elastic/beats/packetbeat/publish" ) const ( @@ -59,7 +59,7 @@ func validatePorts(ports []int) error { // Functions to be exported by a protocol plugin type ProtocolPlugin interface { // Called to initialize the Plugin - Init(test_mode bool, results publisher.Client) error + Init(test_mode bool, results publish.Transactions) error // Called to return the configured ports GetPorts() []int diff --git a/packetbeat/protos/protos_test.go b/packetbeat/protos/protos_test.go index b95926eec78..0153f5b43a2 100644 --- a/packetbeat/protos/protos_test.go +++ b/packetbeat/protos/protos_test.go @@ -5,7 +5,7 @@ import ( "time" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/publisher" + "github.com/elastic/beats/packetbeat/publish" "github.com/stretchr/testify/assert" ) @@ -16,7 +16,7 @@ type TestProtocol struct { type TcpProtocol TestProtocol -func (proto *TcpProtocol) Init(test_mode bool, results publisher.Client) error { +func (proto *TcpProtocol) Init(test_mode bool, results publish.Transactions) error { return nil } @@ -43,7 +43,7 @@ func (proto *TcpProtocol) ConnectionTimeout() time.Duration { return 0 } type UdpProtocol TestProtocol -func (proto *UdpProtocol) Init(test_mode bool, results publisher.Client) error { +func (proto *UdpProtocol) Init(test_mode bool, results publish.Transactions) error { return nil } @@ -57,7 +57,7 @@ func (proto *UdpProtocol) ParseUdp(pkt *Packet) { type TcpUdpProtocol TestProtocol -func (proto *TcpUdpProtocol) Init(test_mode bool, results publisher.Client) error { +func (proto *TcpUdpProtocol) Init(test_mode bool, results publish.Transactions) error { return nil } diff --git a/packetbeat/protos/redis/redis.go b/packetbeat/protos/redis/redis.go index 6341ab0413b..e5e63e65ad3 100644 --- a/packetbeat/protos/redis/redis.go +++ b/packetbeat/protos/redis/redis.go @@ -6,13 +6,13 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/publisher" "github.com/elastic/beats/packetbeat/config" "github.com/elastic/beats/packetbeat/procs" "github.com/elastic/beats/packetbeat/protos" "github.com/elastic/beats/packetbeat/protos/applayer" "github.com/elastic/beats/packetbeat/protos/tcp" + "github.com/elastic/beats/packetbeat/publish" ) type stream struct { @@ -40,7 +40,7 @@ type Redis struct { transactionTimeout time.Duration - results publisher.Client + results publish.Transactions } var ( @@ -73,7 +73,7 @@ func (redis *Redis) GetPorts() []int { return redis.Ports } -func (redis *Redis) Init(test_mode bool, results publisher.Client) error { +func (redis *Redis) Init(test_mode bool, results publish.Transactions) error { redis.InitDefaults() if !test_mode { redis.setFromConfig(config.ConfigSingleton.Protocols.Redis) @@ -242,7 +242,7 @@ func (redis *Redis) correlate(conn *redisConnectionData) { if redis.results != nil { event := redis.newTransaction(requ, resp) - redis.results.PublishEvent(event) + redis.results.PublishTransaction(event) } } } diff --git a/packetbeat/protos/tcp/tcp_test.go b/packetbeat/protos/tcp/tcp_test.go index 82670f6297a..63f81e98864 100644 --- a/packetbeat/protos/tcp/tcp_test.go +++ b/packetbeat/protos/tcp/tcp_test.go @@ -8,8 +8,8 @@ import ( "time" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/publisher" "github.com/elastic/beats/packetbeat/protos" + "github.com/elastic/beats/packetbeat/publish" "github.com/stretchr/testify/assert" "github.com/tsg/gopacket/layers" @@ -25,14 +25,14 @@ const ( type TestProtocol struct { Ports []int - init func(testMode bool, results publisher.Client) error + init func(testMode bool, results publish.Transactions) error parse func(*protos.Packet, *common.TcpTuple, uint8, protos.ProtocolData) protos.ProtocolData onFin func(*common.TcpTuple, uint8, protos.ProtocolData) protos.ProtocolData gap func(*common.TcpTuple, uint8, int, protos.ProtocolData) (protos.ProtocolData, bool) } var _ protos.ProtocolPlugin = &TestProtocol{ - init: func(m bool, r publisher.Client) error { return nil }, + init: func(m bool, r publish.Transactions) error { return nil }, parse: func(p *protos.Packet, t *common.TcpTuple, d uint8, priv protos.ProtocolData) protos.ProtocolData { return priv }, @@ -44,7 +44,7 @@ var _ protos.ProtocolPlugin = &TestProtocol{ }, } -func (proto *TestProtocol) Init(test_mode bool, results publisher.Client) error { +func (proto *TestProtocol) Init(test_mode bool, results publish.Transactions) error { return proto.init(test_mode, results) } diff --git a/packetbeat/protos/thrift/thrift.go b/packetbeat/protos/thrift/thrift.go index 9d3bb39e690..ef9e6d596c2 100644 --- a/packetbeat/protos/thrift/thrift.go +++ b/packetbeat/protos/thrift/thrift.go @@ -12,12 +12,12 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/publisher" "github.com/elastic/beats/packetbeat/config" "github.com/elastic/beats/packetbeat/procs" "github.com/elastic/beats/packetbeat/protos" "github.com/elastic/beats/packetbeat/protos/tcp" + "github.com/elastic/beats/packetbeat/publish" ) type ThriftMessage struct { @@ -154,7 +154,7 @@ type Thrift struct { transactionTimeout time.Duration PublishQueue chan *ThriftTransaction - results publisher.Client + results publish.Transactions Idl *ThriftIdl } @@ -241,7 +241,7 @@ func (thrift *Thrift) GetPorts() []int { return thrift.Ports } -func (thrift *Thrift) Init(test_mode bool, results publisher.Client) error { +func (thrift *Thrift) Init(test_mode bool, results publish.Transactions) error { thrift.InitDefaults() @@ -1131,7 +1131,7 @@ func (thrift *Thrift) publishTransactions() { event["dst"] = &t.Dst if thrift.results != nil { - thrift.results.PublishEvent(event) + thrift.results.PublishTransaction(event) } logp.Debug("thrift", "Published event") diff --git a/packetbeat/protos/udp/udp_test.go b/packetbeat/protos/udp/udp_test.go index 2ca9f00a286..968d5de8e5f 100644 --- a/packetbeat/protos/udp/udp_test.go +++ b/packetbeat/protos/udp/udp_test.go @@ -7,8 +7,8 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/publisher" "github.com/elastic/beats/packetbeat/protos" + "github.com/elastic/beats/packetbeat/publish" "github.com/stretchr/testify/assert" ) @@ -56,7 +56,7 @@ type TestProtocol struct { pkt *protos.Packet // UDP packet that the plugin was called to process. } -func (proto *TestProtocol) Init(test_mode bool, results publisher.Client) error { +func (proto *TestProtocol) Init(test_mode bool, results publish.Transactions) error { return nil } diff --git a/packetbeat/publish/publish.go b/packetbeat/publish/publish.go new file mode 100644 index 00000000000..469b8a7b2f8 --- /dev/null +++ b/packetbeat/publish/publish.go @@ -0,0 +1,187 @@ +package publish + +import ( + "errors" + "fmt" + "sync" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/publisher" +) + +type Transactions interface { + PublishTransaction(common.MapStr) bool +} + +type PacketbeatPublisher struct { + pub *publisher.PublisherType + client publisher.Client + + wg sync.WaitGroup + events chan common.MapStr + done chan struct{} +} + +type ChanTransactions struct { + Channel chan common.MapStr +} + +func (t *ChanTransactions) PublishTransaction(event common.MapStr) bool { + t.Channel <- event + return true +} + +var debugf = logp.MakeDebug("publish") + +func NewPublisher(pub *publisher.PublisherType, hwm int) *PacketbeatPublisher { + return &PacketbeatPublisher{ + pub: pub, + client: pub.Client(), + done: make(chan struct{}), + events: make(chan common.MapStr, hwm), + } +} + +func (t *PacketbeatPublisher) PublishTransaction(event common.MapStr) bool { + select { + case t.events <- event: + return true + default: + return false + } +} + +func (t *PacketbeatPublisher) Start() { + t.wg.Add(1) + go func() { + defer t.wg.Done() + + for { + select { + case event := <-t.events: + t.onEvent(event) + case <-t.done: + return + } + } + }() +} + +func (t *PacketbeatPublisher) Stop() { + close(t.done) + t.wg.Wait() +} + +func (t *PacketbeatPublisher) onEvent(event common.MapStr) { + if err := validateEvent(event); err != nil { + logp.Warn("Dropping invalid event: %v", err) + return + } + + debugf("on event") + + if !updateEventAddresses(t.pub, event) { + return + } + + t.client.PublishEvent(event) +} + +// filterEvent validates an event for common required fields with types. +// If event is to be filtered out the reason is returned as error. +func validateEvent(event common.MapStr) error { + ts, ok := event["@timestamp"] + if !ok { + return errors.New("missing '@timestamp' field from event") + } + + _, ok = ts.(common.Time) + if !ok { + return errors.New("invalid '@timestamp' field from event") + } + + err := event.EnsureCountField() + if err != nil { + return err + } + + t, ok := event["type"] + if !ok { + return errors.New("missing 'type' field from event") + } + + _, ok = t.(string) + if !ok { + return errors.New("invalid 'type' field from event") + } + + return nil +} + +func updateEventAddresses(pub *publisher.PublisherType, event common.MapStr) bool { + var srcServer, dstServer string + src, ok := event["src"].(*common.Endpoint) + + if ok { + // check if it's outgoing transaction (as client) + isOutgoing := pub.IsPublisherIP(src.Ip) + if isOutgoing { + if pub.IgnoreOutgoing { + // duplicated transaction -> ignore it + debugf("Ignore duplicated transaction on: %s -> %s", srcServer, dstServer) + return false + } + + //outgoing transaction + event["direction"] = "out" + } + + srcServer = pub.GetServerName(src.Ip) + event["client_ip"] = src.Ip + event["client_port"] = src.Port + event["client_proc"] = src.Proc + event["client_server"] = srcServer + delete(event, "src") + } + + dst, ok := event["dst"].(*common.Endpoint) + if ok { + dstServer = pub.GetServerName(dst.Ip) + event["ip"] = dst.Ip + event["port"] = dst.Port + event["proc"] = dst.Proc + event["server"] = dstServer + delete(event, "dst") + + //check if it's incoming transaction (as server) + if pub.IsPublisherIP(dst.Ip) { + // incoming transaction + event["direction"] = "in" + } + + } + + event.EnsureCountField() + + if pub.GeoLite != nil { + realIP, exists := event["real_ip"] + if exists && len(realIP.(common.NetString)) > 0 { + loc := pub.GeoLite.GetLocationByIP(string(realIP.(common.NetString))) + if loc != nil && loc.Latitude != 0 && loc.Longitude != 0 { + loc := fmt.Sprintf("%f, %f", loc.Latitude, loc.Longitude) + event["client_location"] = loc + } + } else { + if len(srcServer) == 0 && src != nil { // only for external IP addresses + loc := pub.GeoLite.GetLocationByIP(src.Ip) + if loc != nil && loc.Latitude != 0 && loc.Longitude != 0 { + loc := fmt.Sprintf("%f, %f", loc.Latitude, loc.Longitude) + event["client_location"] = loc + } + } + } + } + + return true +} diff --git a/libbeat/publisher/preprocess_test.go b/packetbeat/publish/publish_test.go similarity index 70% rename from libbeat/publisher/preprocess_test.go rename to packetbeat/publish/publish_test.go index 0f289a4c81c..b511e2f5e53 100644 --- a/libbeat/publisher/preprocess_test.go +++ b/packetbeat/publish/publish_test.go @@ -1,13 +1,23 @@ -package publisher +package publish import ( "testing" "time" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/publisher" "github.com/stretchr/testify/assert" ) +func testEvent() common.MapStr { + event := common.MapStr{} + event["@timestamp"] = common.Time(time.Now()) + event["type"] = "test" + event["src"] = &common.Endpoint{} + event["dst"] = &common.Endpoint{} + return event +} + // Test that FilterEvent detects events that do not contain the required fields // and returns error. func TestFilterEvent(t *testing.T) { @@ -23,36 +33,34 @@ func TestFilterEvent(t *testing.T) { m := testEvent() m["@timestamp"] = time.Now() return m - }, "Invalid '@timestamp'"}, + }, "invalid '@timestamp'"}, {func() common.MapStr { m := testEvent() delete(m, "@timestamp") return m - }, "Missing '@timestamp'"}, + }, "missing '@timestamp'"}, {func() common.MapStr { m := testEvent() delete(m, "type") return m - }, "Missing 'type'"}, + }, "missing 'type'"}, {func() common.MapStr { m := testEvent() m["type"] = 123 return m - }, "Invalid 'type'"}, + }, "invalid 'type'"}, } for _, test := range testCases { - assert.Regexp(t, test.err, filterEvent(test.f())) + assert.Regexp(t, test.err, validateEvent(test.f())) } } func TestDirectionOut(t *testing.T) { - publisher := PublisherType{} - - publisher.ipaddrs = []string{"192.145.2.4"} + publisher := newTestPublisher([]string{"192.145.2.4"}) event := common.MapStr{ "src": &common.Endpoint{ @@ -71,15 +79,13 @@ func TestDirectionOut(t *testing.T) { }, } - assert.True(t, updateEventAddresses(&publisher, event)) + assert.True(t, updateEventAddresses(publisher, event)) assert.True(t, event["client_ip"] == "192.145.2.4") assert.True(t, event["direction"] == "out") } func TestDirectionIn(t *testing.T) { - publisher := PublisherType{} - - publisher.ipaddrs = []string{"192.145.2.5"} + publisher := newTestPublisher([]string{"192.145.2.5"}) event := common.MapStr{ "src": &common.Endpoint{ @@ -98,15 +104,19 @@ func TestDirectionIn(t *testing.T) { }, } - assert.True(t, updateEventAddresses(&publisher, event)) + assert.True(t, updateEventAddresses(publisher, event)) assert.True(t, event["client_ip"] == "192.145.2.4") assert.True(t, event["direction"] == "in") } -func TestNoDirection(t *testing.T) { - publisher := PublisherType{} +func newTestPublisher(ips []string) *publisher.PublisherType { + p := &publisher.PublisherType{} + p.IpAddrs = ips + return p +} - publisher.ipaddrs = []string{"192.145.2.6"} +func TestNoDirection(t *testing.T) { + publisher := newTestPublisher([]string{"192.145.2.6"}) event := common.MapStr{ "src": &common.Endpoint{ @@ -125,7 +135,7 @@ func TestNoDirection(t *testing.T) { }, } - assert.True(t, updateEventAddresses(&publisher, event)) + assert.True(t, updateEventAddresses(publisher, event)) assert.True(t, event["client_ip"] == "192.145.2.4") _, ok := event["direction"] assert.False(t, ok) diff --git a/packetbeat/tests/system/test_0013_redis_basic.py b/packetbeat/tests/system/test_0013_redis_basic.py index c051aa8c979..ed4e465b5ee 100644 --- a/packetbeat/tests/system/test_0013_redis_basic.py +++ b/packetbeat/tests/system/test_0013_redis_basic.py @@ -14,7 +14,7 @@ def test_redis_session(self): self.render_config_template( redis_ports=[6380] ) - self.run_packetbeat(pcap="redis_session.pcap") + self.run_packetbeat(pcap="redis_session.pcap", debug_selectors=["*"]) objs = self.read_output() assert all([o["type"] == "redis" for o in objs]) diff --git a/topbeat/beat/topbeat.go b/topbeat/beat/topbeat.go index e61e927c07d..50ca2469607 100644 --- a/topbeat/beat/topbeat.go +++ b/topbeat/beat/topbeat.go @@ -224,6 +224,7 @@ func (t *Topbeat) exportProcStats() error { "@timestamp": common.Time(time.Now()), "type": "process", "proc": proc, + "count": 1, } t.events.PublishEvent(event) @@ -275,6 +276,7 @@ func (t *Topbeat) exportSystemStats() error { "cpu": cpu_stat, "mem": mem_stat, "swap": swap_stat, + "count": 1, } if t.cpuPerCore { @@ -317,6 +319,7 @@ func collectFileSystemStats(fss []sigar.FileSystem) []common.MapStr { "@timestamp": common.Time(time.Now()), "type": "filesystem", "fs": fsStat, + "count": 1, } events = append(events, event) } diff --git a/winlogbeat/eventlog/eventlog.go b/winlogbeat/eventlog/eventlog.go index 0b3fce3f9c5..30d9805e26c 100644 --- a/winlogbeat/eventlog/eventlog.go +++ b/winlogbeat/eventlog/eventlog.go @@ -76,6 +76,7 @@ func (r Record) ToMapStr() common.MapStr { "event_id": r.EventID, "level": r.Level, "type": r.API, + "count": 1, } if r.Message != "" {