Skip to content

Commit

Permalink
Update file_integrity metricset logging
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewkroh authored and ph committed Dec 23, 2017
1 parent b87b663 commit 04faf07
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 89 deletions.
48 changes: 21 additions & 27 deletions auditbeat/module/file_integrity/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"crypto/sha256"
"crypto/sha512"
"encoding/hex"
"encoding/json"
"fmt"
"hash"
"io"
Expand Down Expand Up @@ -92,13 +91,13 @@ func (d Digest) MarshalText() ([]byte, error) { return []byte(d.String()), nil }

// Event describe the filesystem change and includes metadata about the file.
type Event struct {
Timestamp time.Time // Time of event.
Path string // The path associated with the event.
TargetPath string // Target path for symlinks.
Info *Metadata // File metadata (if the file exists).
Source Source // Source of the event.
Action Action // Action (like created, updated).
Hashes map[HashType]Digest // File hashes.
Timestamp time.Time `json:"timestamp"` // Time of event.
Path string `json:"path"` // The path associated with the event.
TargetPath string `json:"target_path,omitempty"` // Target path for symlinks.
Info *Metadata `json:"info"` // File metadata (if the file exists).
Source Source `json:"source"` // Source of the event.
Action Action `json:"action"` // Action (like created, updated).
Hashes map[HashType]Digest `json:"hash,omitempty"` // File hashes.

// Metadata
rtt time.Duration // Time taken to collect the info.
Expand All @@ -107,20 +106,20 @@ type Event struct {

// Metadata contains file metadata.
type Metadata struct {
Inode uint64
UID uint32
GID uint32
SID string
Owner string
Group string
Size uint64
MTime time.Time // Last modification time.
CTime time.Time // Last metadata change time.
Type Type // File type (dir, file, symlink).
Mode os.FileMode // Permissions
SetUID bool // setuid bit (POSIX only)
SetGID bool // setgid bit (POSIX only)
Origin []string // External origin info for the file (MacOS only)
Inode uint64 `json:"inode"`
UID uint32 `json:"uid"`
GID uint32 `json:"gid"`
SID string `json:"sid"`
Owner string `json:"owner"`
Group string `json:"group"`
Size uint64 `json:"size"`
MTime time.Time `json:"mtime"` // Last modification time.
CTime time.Time `json:"ctime"` // Last metadata change time.
Type Type `json:"type"` // File type (dir, file, symlink).
Mode os.FileMode `json:"mode"` // Permissions
SetUID bool `json:"setuid"` // setuid bit (POSIX only)
SetGID bool `json:"setgid"` // setgid bit (POSIX only)
Origin []string `json:"origin"` // External origin info for the file (MacOS only)
}

// NewEventFromFileInfo creates a new Event based on data from a os.FileInfo
Expand Down Expand Up @@ -197,11 +196,6 @@ func NewEvent(
return NewEventFromFileInfo(path, info, err, action, source, maxFileSize, hashTypes)
}

func (e *Event) String() string {
data, _ := json.Marshal(e)
return string(data)
}

func buildMetricbeatEvent(e *Event, existedBefore bool) mb.Event {
m := common.MapStr{
"path": e.Path,
Expand Down
20 changes: 14 additions & 6 deletions auditbeat/module/file_integrity/eventreader_fsevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type fsreader struct {
config Config
eventC chan Event
watchedDirs []os.FileInfo
log *logp.Logger
}

var flagToAction = map[fsevents.EventFlags]Action{
Expand Down Expand Up @@ -95,13 +96,14 @@ func NewEventReader(c Config) (EventProducer, error) {
stream.Flags |= fsevents.IgnoreSelf
}

log := logp.NewLogger(moduleName)
var dirs []os.FileInfo
if !c.Recursive {
for _, path := range c.Paths {
if info, err := getFileInfo(path); err == nil {
dirs = append(dirs, info)
} else {
logp.Warn("%v failed to get file info for '%s': %v", logPrefix, path, err)
log.Warnw("Failed to get file info", "file_path", path, "error", err)
}
}
}
Expand All @@ -110,13 +112,16 @@ func NewEventReader(c Config) (EventProducer, error) {
config: c,
eventC: make(chan Event, 1),
watchedDirs: dirs,
log: log,
}, nil
}

func (r *fsreader) Start(done <-chan struct{}) (<-chan Event, error) {
r.stream.Start()
go r.consumeEvents(done)
logp.Info("%v started FSEvents watcher recursive:%v", logPrefix, r.config.Recursive)
r.log.Infow("Started FSEvents watcher",
"file_path", r.config.Paths,
"recursive", r.config.Recursive)
return r.eventC, nil
}

Expand All @@ -127,15 +132,18 @@ func (r *fsreader) consumeEvents(done <-chan struct{}) {
for {
select {
case <-done:
debugf("Terminated")
r.log.Debug("FSEvents reader terminated")
return
case events := <-r.stream.Events:
for _, event := range events {
if !r.isWatched(event.Path) || r.config.IsExcludedPath(event.Path) {
continue
}
debugf("Received FSEvents event: id=%d path=%v flags=%s",
event.ID, event.Path, flagsToString(event.Flags))
r.log.Debugw("Received FSEvents event",
"file_path", event.Path,
"event_id", event.ID,
"event_flags", flagsToString(event.Flags))

start := time.Now()
e := NewEvent(event.Path, flagsToAction(event.Flags), SourceFSNotify,
r.config.MaxFileSizeBytes, r.config.HashTypes)
Expand Down Expand Up @@ -183,7 +191,7 @@ func (r *fsreader) isWatched(path string) bool {
dir := filepath.Dir(path)
info, err := getFileInfo(dir)
if err != nil {
logp.Warn("%v failed to get event file info for '%s': %v", logPrefix, dir, err)
r.log.Warnw("failed to get file info", "file_path", dir, "error", err)
return false
}
for _, dir := range r.watchedDirs {
Expand Down
20 changes: 13 additions & 7 deletions auditbeat/module/file_integrity/eventreader_fsnotify.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type reader struct {
watcher monitor.Watcher
config Config
eventC chan Event
log *logp.Logger
}

// NewEventReader creates a new EventProducer backed by fsnotify.
Expand All @@ -30,17 +31,19 @@ func NewEventReader(c Config) (EventProducer, error) {
watcher: watcher,
config: c,
eventC: make(chan Event, 1),
log: logp.NewLogger(moduleName),
}, nil
}

func (r *reader) Start(done <-chan struct{}) (<-chan Event, error) {
for _, p := range r.config.Paths {
if err := r.watcher.Add(p); err != nil {
if err == syscall.EMFILE {
logp.Warn("%v Failed to watch %v: %v (check the max number of "+
"open files allowed with 'ulimit -a')", logPrefix, p, err)
r.log.Warnw("Failed to add watch (check the max number of "+
"open files allowed with 'ulimit -a')",
"file_path", p, "error", err)
} else {
logp.Warn("%v Failed to watch %v: %v", logPrefix, p, err)
r.log.Warnw("Failed to add watch", "file_path", p, "error", err)
}
}
}
Expand All @@ -49,7 +52,9 @@ func (r *reader) Start(done <-chan struct{}) (<-chan Event, error) {
return nil, errors.Wrap(err, "unable to start watcher")
}
go r.consumeEvents()
logp.Info("%v started fsnotify watcher recursive:%v", logPrefix, r.config.Recursive)
r.log.Infow("Started fsnotify watcher",
"file_path", r.config.Paths,
"recursive", r.config.Recursive)
return r.eventC, nil
}

Expand All @@ -63,8 +68,9 @@ func (r *reader) consumeEvents() {
if event.Name == "" || r.config.IsExcludedPath(event.Name) {
continue
}
debugf("Received fsnotify event: path=%v action=%v",
event.Name, event.Op.String())
r.log.Debugw("Received fsnotify event",
"file_path", event.Name,
"event_flags", event.Op)

start := time.Now()
e := NewEvent(event.Name, opToAction(event.Op), SourceFSNotify,
Expand All @@ -73,7 +79,7 @@ func (r *reader) consumeEvents() {

r.eventC <- e
case err := <-r.watcher.ErrorChannel():
logp.Warn("%v fsnotify watcher error: %v", logPrefix, err)
r.log.Warnw("fsnotify watcher error", "error", err)
}
}
}
Expand Down
55 changes: 31 additions & 24 deletions auditbeat/module/file_integrity/metricset.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,12 @@ import (
const (
moduleName = "file_integrity"
metricsetName = "file"
logPrefix = "[" + moduleName + "]"
bucketName = "file.v1"

// Use old namespace for data until we do some field renaming for GA.
namespace = "audit.file"
)

var (
debugf = logp.MakeDebug(moduleName)
)

func init() {
mb.Registry.MustAddMetricSet(moduleName, metricsetName, New,
mb.DefaultMetricSet(),
Expand All @@ -54,6 +49,7 @@ type MetricSet struct {
config Config
reader EventProducer
scanner EventProducer
log *logp.Logger

// Runtime params that are initialized on Run().
bucket datastore.BoltBucket
Expand All @@ -80,6 +76,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
BaseMetricSet: base,
config: config,
reader: r,
log: logp.NewLogger(moduleName),
}

if config.ScanAtStart {
Expand All @@ -89,7 +86,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
}
}

debugf("Initialized the file event reader. Running as euid=%v", os.Geteuid())
ms.log.Debugf("Initialized the file event reader. Running as euid=%v", os.Geteuid())

return ms, nil
}
Expand Down Expand Up @@ -139,7 +136,7 @@ func (ms *MetricSet) init(reporter mb.PushReporterV2) bool {
if err != nil {
err = errors.Wrap(err, "failed to open persistent datastore")
reporter.Error(err)
logp.Err("%v %v", logPrefix, err)
ms.log.Errorw("Failed to initialize", "error", err)
return false
}
ms.bucket = bucket.(datastore.BoltBucket)
Expand All @@ -148,7 +145,7 @@ func (ms *MetricSet) init(reporter mb.PushReporterV2) bool {
if err != nil {
err = errors.Wrap(err, "failed to start fsnotify event producer")
reporter.Error(err)
logp.Err("%v %v", logPrefix, err)
ms.log.Errorw("Failed to initialize", "error", err)
return false
}

Expand All @@ -158,7 +155,7 @@ func (ms *MetricSet) init(reporter mb.PushReporterV2) bool {
if err != nil {
err = errors.Wrap(err, "failed to start file scanner")
reporter.Error(err)
logp.Err("%v %v", logPrefix, err)
ms.log.Errorw("Failed to initialize", "error", err)
return false
}
}
Expand All @@ -167,9 +164,12 @@ func (ms *MetricSet) init(reporter mb.PushReporterV2) bool {
}

func (ms *MetricSet) reportEvent(reporter mb.PushReporterV2, event *Event) bool {
if len(event.errors) > 0 && logp.IsDebug(moduleName) {
debugf("Errors on event for %v with action=%v: %v",
event.Path, event.Action, event.errors)
if len(event.errors) == 1 {
ms.log.Debugw("Error in event", "file_path", event.Path,
"action", event.Action, "error", event.errors[0])
} else if len(event.errors) > 1 {
ms.log.Debugw("Multiple errors in event", "file_path", event.Path,
"action", event.Action, "errors", event.errors)
}

changed, lastEvent := ms.hasFileChangedSinceLastEvent(event)
Expand All @@ -183,11 +183,11 @@ func (ms *MetricSet) reportEvent(reporter mb.PushReporterV2, event *Event) bool
// Persist event locally.
if event.Info == nil {
if err := ms.bucket.Delete(event.Path); err != nil {
logp.Err("%v %v", logPrefix, err)
ms.log.Errorw("Failed during DB delete", "error", err)
}
} else {
if err := store(ms.bucket, event); err != nil {
logp.Err("%v %v", logPrefix, err)
ms.log.Errorw("Failed during DB store", "error", err)
}
}
return true
Expand All @@ -197,7 +197,7 @@ func (ms *MetricSet) hasFileChangedSinceLastEvent(event *Event) (changed bool, l
// Load event from DB.
lastEvent, err := load(ms.bucket, event.Path)
if err != nil {
logp.Warn("%v %v", logPrefix, err)
ms.log.Warnw("Failed during DB load", "error", err)
return true, lastEvent
}

Expand All @@ -206,18 +206,19 @@ func (ms *MetricSet) hasFileChangedSinceLastEvent(event *Event) (changed bool, l
event.Action = action
}

if changed && logp.IsDebug(moduleName) {
debugf("file at %v has changed since last seen: old=%v, new=%v",
event.Path, lastEvent, event)
if changed {
ms.log.Debugw("File changed since it was last seen",
"file_path", event.Path, "took", event.rtt,
logp.Namespace("event"), "old", lastEvent, "new", event)
}
return changed, lastEvent
}

func (ms *MetricSet) purgeDeleted(reporter mb.PushReporterV2) {
for _, prefix := range ms.config.Paths {
deleted, err := purgeOlder(ms.bucket, ms.scanStart, prefix)
deleted, err := ms.purgeOlder(ms.scanStart, prefix)
if err != nil {
logp.Err("%v %v", logPrefix, err)
ms.log.Errorw("Failure while purging older records", "error", err)
continue
}

Expand All @@ -234,7 +235,7 @@ func (ms *MetricSet) purgeDeleted(reporter mb.PushReporterV2) {

// purgeOlder does a prefix scan of the keys in the datastore and purges items
// older than the specified time.
func purgeOlder(b datastore.BoltBucket, t time.Time, prefix string) ([]*Event, error) {
func (ms *MetricSet) purgeOlder(t time.Time, prefix string) ([]*Event, error) {
var (
deleted []*Event
totalKeys uint64
Expand All @@ -248,7 +249,7 @@ func purgeOlder(b datastore.BoltBucket, t time.Time, prefix string) ([]*Event, e
startTime = time.Now()
)

err := b.Update(func(b *bolt.Bucket) error {
err := ms.bucket.Update(func(b *bolt.Bucket) error {
c := b.Cursor()

for path, v := c.Seek(p); path != nil && matchesPrefix(path); path, v = c.Next() {
Expand All @@ -269,8 +270,14 @@ func purgeOlder(b datastore.BoltBucket, t time.Time, prefix string) ([]*Event, e
return nil
})

debugf("Purged %v of %v entries in %v for %v", len(deleted),
totalKeys, time.Since(startTime), prefix)
took := time.Since(startTime)
ms.log.With(
"file_path", prefix,
"took", took,
"items_total", totalKeys,
"items_deleted", len(deleted)).
Debugf("Purged %v of %v entries in %v for %v", len(deleted), totalKeys,
time.Since(startTime), prefix)
return deleted, err
}

Expand Down
Loading

0 comments on commit 04faf07

Please sign in to comment.