Skip to content

Commit

Permalink
Use a separate audit client for lost event monitoring (#7561)
Browse files Browse the repository at this point in the history
A single audit client was used both for receiving events and to
periodically check the audit status for lost events. This means that
when the publishing pipeline blocks, no lost events statistic is
generated. Now a secondary audit client will be used to run this
periodic polling.
  • Loading branch information
adriansr authored and andrewkroh committed Jul 12, 2018
1 parent 947221e commit 25df531
Showing 1 changed file with 15 additions and 24 deletions.
39 changes: 15 additions & 24 deletions auditbeat/module/auditd/audit_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,27 +147,33 @@ func (ms *MetricSet) Run(reporter mb.PushReporterV2) {
return
}

out, statusC, err := ms.receiveEvents(reporter.Done())
out, err := ms.receiveEvents(reporter.Done())
if err != nil {
reporter.Error(err)
ms.log.Errorw("Failure receiving audit events", "error", err)
return
}

if ms.kernelLost.enabled {
client, err := libaudit.NewAuditClient(nil)
if err != nil {
reporter.Error(err)
ms.log.Errorw("Failure creating audit monitoring client", "error", err)
}
go func() {
defer client.Close()
timer := time.NewTicker(lostEventsUpdateInterval)
defer timer.Stop()
for {
select {
case <-reporter.Done():
return
case <-timer.C:
if _, err := ms.client.GetStatusAsync(false); err != nil {
ms.log.Error("get async status request failed:", err)
if status, err := client.GetStatus(); err == nil {
ms.updateKernelLostMetric(status.Lost)
} else {
ms.log.Error("get status request failed:", err)
}
case status := <-statusC:
ms.updateKernelLostMetric(status.Lost)
}
}
}()
Expand Down Expand Up @@ -360,13 +366,12 @@ func (ms *MetricSet) updateKernelLostMetric(lost uint32) {
ms.kernelLost.counter = lost
}

func (ms *MetricSet) receiveEvents(done <-chan struct{}) (<-chan []*auparse.AuditMessage, <-chan *libaudit.AuditStatus, error) {
func (ms *MetricSet) receiveEvents(done <-chan struct{}) (<-chan []*auparse.AuditMessage, error) {
if err := ms.initClient(); err != nil {
return nil, nil, err
return nil, err
}

out := make(chan []*auparse.AuditMessage, ms.config.StreamBufferQueueSize)
statusC := make(chan *libaudit.AuditStatus, 8)

var st libaudit.Stream = &stream{done, out}
if ms.backpressureStrategy&bsUserSpace != 0 {
Expand All @@ -379,14 +384,13 @@ func (ms *MetricSet) receiveEvents(done <-chan struct{}) (<-chan []*auparse.Audi
}
reassembler, err := libaudit.NewReassembler(int(ms.config.ReassemblerMaxInFlight), ms.config.ReassemblerTimeout, st)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to create Reassembler")
return nil, errors.Wrap(err, "failed to create Reassembler")
}
go maintain(done, reassembler)

go func() {
defer ms.log.Debug("receiveEvents goroutine exited")
defer close(out)
defer close(statusC)
defer reassembler.Close()

for {
Expand All @@ -398,19 +402,6 @@ func (ms *MetricSet) receiveEvents(done <-chan struct{}) (<-chan []*auparse.Audi
}
continue
}
if raw.Type == auparse.AUDIT_GET {
status := &libaudit.AuditStatus{}
if err := status.FromWireFormat([]byte(raw.Data)); err == nil {
select {
case statusC <- status:
default:
ms.log.Debugf("Dropped audit status reply (channel busy)")
}
} else {
ms.log.Error("Decoding status message:", err)
}
continue
}

if filterRecordType(raw.Type) {
continue
Expand All @@ -426,7 +417,7 @@ func (ms *MetricSet) receiveEvents(done <-chan struct{}) (<-chan []*auparse.Audi
}
}()

return out, statusC, nil
return out, nil
}

// maintain periodically evicts timed-out events from the Reassembler. This
Expand Down

0 comments on commit 25df531

Please sign in to comment.