Skip to content

Commit

Permalink
Merge pull request #113 from urso/fix/event-fields
Browse files Browse the repository at this point in the history
fix published event fields
  • Loading branch information
ruflin committed Oct 20, 2015
2 parents e301ebd + 7d5e4b6 commit e473025
Showing 1 changed file with 4 additions and 18 deletions.
22 changes: 4 additions & 18 deletions beat/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,20 +120,18 @@ func (fb *Filebeat) Stop() {
}

func Publish(beat *beat.Beat, fb *Filebeat) {

// Receives events from spool during flush
for events := range fb.publisherChan {
logp.Debug("filebeat", "Send events to output")

pubEvents := make([]common.MapStr, 0, len(events))

logp.Debug("filebeat", "Send events to output")
for _, event := range events {
bEvent := common.MapStr{
"timestamp": common.Time(time.Now()),
"source": event.Source,
"offset": event.Offset,
"message": event.Line,
"text": event.Text,
"line": event.Line,
"message": event.Text,
"fields": event.Fields,
"fileinfo": event.Fileinfo,
"type": "log",
Expand All @@ -142,23 +140,11 @@ func Publish(beat *beat.Beat, fb *Filebeat) {
pubEvents = append(pubEvents, bEvent)
}

publishEvents(beat.Events, pubEvents)
beat.Events.PublishEvents(pubEvents, publisher.Sync)

logp.Debug("filebeat", "Events sent: %d", len(events))

// Tell the registrar that we've successfully sent these events
fb.registrar.Channel <- events
}
}

func publishEvents(client publisher.Client, events []common.MapStr) {

// Sends event to beat (outputs).
// Wait/Repeat until all events are published
for {
ok := client.PublishEvents(events, publisher.Confirm)
if ok {
break
}
}
}

0 comments on commit e473025

Please sign in to comment.