From 04faf07a9afc8e4e95d1a36412431847f47af016 Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Wed, 20 Dec 2017 18:30:27 -0500 Subject: [PATCH] Update file_integrity metricset logging --- auditbeat/module/file_integrity/event.go | 48 +++++++--------- .../file_integrity/eventreader_fsevents.go | 20 +++++-- .../file_integrity/eventreader_fsnotify.go | 20 ++++--- auditbeat/module/file_integrity/metricset.go | 55 +++++++++++-------- auditbeat/module/file_integrity/scanner.go | 47 ++++++++-------- 5 files changed, 101 insertions(+), 89 deletions(-) diff --git a/auditbeat/module/file_integrity/event.go b/auditbeat/module/file_integrity/event.go index 88aa3613003..cdf7bf9851f 100644 --- a/auditbeat/module/file_integrity/event.go +++ b/auditbeat/module/file_integrity/event.go @@ -7,7 +7,6 @@ import ( "crypto/sha256" "crypto/sha512" "encoding/hex" - "encoding/json" "fmt" "hash" "io" @@ -92,13 +91,13 @@ func (d Digest) MarshalText() ([]byte, error) { return []byte(d.String()), nil } // Event describe the filesystem change and includes metadata about the file. type Event struct { - Timestamp time.Time // Time of event. - Path string // The path associated with the event. - TargetPath string // Target path for symlinks. - Info *Metadata // File metadata (if the file exists). - Source Source // Source of the event. - Action Action // Action (like created, updated). - Hashes map[HashType]Digest // File hashes. + Timestamp time.Time `json:"timestamp"` // Time of event. + Path string `json:"path"` // The path associated with the event. + TargetPath string `json:"target_path,omitempty"` // Target path for symlinks. + Info *Metadata `json:"info"` // File metadata (if the file exists). + Source Source `json:"source"` // Source of the event. + Action Action `json:"action"` // Action (like created, updated). + Hashes map[HashType]Digest `json:"hash,omitempty"` // File hashes. // Metadata rtt time.Duration // Time taken to collect the info. @@ -107,20 +106,20 @@ type Event struct { // Metadata contains file metadata. type Metadata struct { - Inode uint64 - UID uint32 - GID uint32 - SID string - Owner string - Group string - Size uint64 - MTime time.Time // Last modification time. - CTime time.Time // Last metadata change time. - Type Type // File type (dir, file, symlink). - Mode os.FileMode // Permissions - SetUID bool // setuid bit (POSIX only) - SetGID bool // setgid bit (POSIX only) - Origin []string // External origin info for the file (MacOS only) + Inode uint64 `json:"inode"` + UID uint32 `json:"uid"` + GID uint32 `json:"gid"` + SID string `json:"sid"` + Owner string `json:"owner"` + Group string `json:"group"` + Size uint64 `json:"size"` + MTime time.Time `json:"mtime"` // Last modification time. + CTime time.Time `json:"ctime"` // Last metadata change time. + Type Type `json:"type"` // File type (dir, file, symlink). + Mode os.FileMode `json:"mode"` // Permissions + SetUID bool `json:"setuid"` // setuid bit (POSIX only) + SetGID bool `json:"setgid"` // setgid bit (POSIX only) + Origin []string `json:"origin"` // External origin info for the file (MacOS only) } // NewEventFromFileInfo creates a new Event based on data from a os.FileInfo @@ -197,11 +196,6 @@ func NewEvent( return NewEventFromFileInfo(path, info, err, action, source, maxFileSize, hashTypes) } -func (e *Event) String() string { - data, _ := json.Marshal(e) - return string(data) -} - func buildMetricbeatEvent(e *Event, existedBefore bool) mb.Event { m := common.MapStr{ "path": e.Path, diff --git a/auditbeat/module/file_integrity/eventreader_fsevents.go b/auditbeat/module/file_integrity/eventreader_fsevents.go index d01807e2509..38b43d3520c 100644 --- a/auditbeat/module/file_integrity/eventreader_fsevents.go +++ b/auditbeat/module/file_integrity/eventreader_fsevents.go @@ -26,6 +26,7 @@ type fsreader struct { config Config eventC chan Event watchedDirs []os.FileInfo + log *logp.Logger } var flagToAction = map[fsevents.EventFlags]Action{ @@ -95,13 +96,14 @@ func NewEventReader(c Config) (EventProducer, error) { stream.Flags |= fsevents.IgnoreSelf } + log := logp.NewLogger(moduleName) var dirs []os.FileInfo if !c.Recursive { for _, path := range c.Paths { if info, err := getFileInfo(path); err == nil { dirs = append(dirs, info) } else { - logp.Warn("%v failed to get file info for '%s': %v", logPrefix, path, err) + log.Warnw("Failed to get file info", "file_path", path, "error", err) } } } @@ -110,13 +112,16 @@ func NewEventReader(c Config) (EventProducer, error) { config: c, eventC: make(chan Event, 1), watchedDirs: dirs, + log: log, }, nil } func (r *fsreader) Start(done <-chan struct{}) (<-chan Event, error) { r.stream.Start() go r.consumeEvents(done) - logp.Info("%v started FSEvents watcher recursive:%v", logPrefix, r.config.Recursive) + r.log.Infow("Started FSEvents watcher", + "file_path", r.config.Paths, + "recursive", r.config.Recursive) return r.eventC, nil } @@ -127,15 +132,18 @@ func (r *fsreader) consumeEvents(done <-chan struct{}) { for { select { case <-done: - debugf("Terminated") + r.log.Debug("FSEvents reader terminated") return case events := <-r.stream.Events: for _, event := range events { if !r.isWatched(event.Path) || r.config.IsExcludedPath(event.Path) { continue } - debugf("Received FSEvents event: id=%d path=%v flags=%s", - event.ID, event.Path, flagsToString(event.Flags)) + r.log.Debugw("Received FSEvents event", + "file_path", event.Path, + "event_id", event.ID, + "event_flags", flagsToString(event.Flags)) + start := time.Now() e := NewEvent(event.Path, flagsToAction(event.Flags), SourceFSNotify, r.config.MaxFileSizeBytes, r.config.HashTypes) @@ -183,7 +191,7 @@ func (r *fsreader) isWatched(path string) bool { dir := filepath.Dir(path) info, err := getFileInfo(dir) if err != nil { - logp.Warn("%v failed to get event file info for '%s': %v", logPrefix, dir, err) + r.log.Warnw("failed to get file info", "file_path", dir, "error", err) return false } for _, dir := range r.watchedDirs { diff --git a/auditbeat/module/file_integrity/eventreader_fsnotify.go b/auditbeat/module/file_integrity/eventreader_fsnotify.go index 055aeee09a6..a079a920b8c 100644 --- a/auditbeat/module/file_integrity/eventreader_fsnotify.go +++ b/auditbeat/module/file_integrity/eventreader_fsnotify.go @@ -17,6 +17,7 @@ type reader struct { watcher monitor.Watcher config Config eventC chan Event + log *logp.Logger } // NewEventReader creates a new EventProducer backed by fsnotify. @@ -30,6 +31,7 @@ func NewEventReader(c Config) (EventProducer, error) { watcher: watcher, config: c, eventC: make(chan Event, 1), + log: logp.NewLogger(moduleName), }, nil } @@ -37,10 +39,11 @@ func (r *reader) Start(done <-chan struct{}) (<-chan Event, error) { for _, p := range r.config.Paths { if err := r.watcher.Add(p); err != nil { if err == syscall.EMFILE { - logp.Warn("%v Failed to watch %v: %v (check the max number of "+ - "open files allowed with 'ulimit -a')", logPrefix, p, err) + r.log.Warnw("Failed to add watch (check the max number of "+ + "open files allowed with 'ulimit -a')", + "file_path", p, "error", err) } else { - logp.Warn("%v Failed to watch %v: %v", logPrefix, p, err) + r.log.Warnw("Failed to add watch", "file_path", p, "error", err) } } } @@ -49,7 +52,9 @@ func (r *reader) Start(done <-chan struct{}) (<-chan Event, error) { return nil, errors.Wrap(err, "unable to start watcher") } go r.consumeEvents() - logp.Info("%v started fsnotify watcher recursive:%v", logPrefix, r.config.Recursive) + r.log.Infow("Started fsnotify watcher", + "file_path", r.config.Paths, + "recursive", r.config.Recursive) return r.eventC, nil } @@ -63,8 +68,9 @@ func (r *reader) consumeEvents() { if event.Name == "" || r.config.IsExcludedPath(event.Name) { continue } - debugf("Received fsnotify event: path=%v action=%v", - event.Name, event.Op.String()) + r.log.Debugw("Received fsnotify event", + "file_path", event.Name, + "event_flags", event.Op) start := time.Now() e := NewEvent(event.Name, opToAction(event.Op), SourceFSNotify, @@ -73,7 +79,7 @@ func (r *reader) consumeEvents() { r.eventC <- e case err := <-r.watcher.ErrorChannel(): - logp.Warn("%v fsnotify watcher error: %v", logPrefix, err) + r.log.Warnw("fsnotify watcher error", "error", err) } } } diff --git a/auditbeat/module/file_integrity/metricset.go b/auditbeat/module/file_integrity/metricset.go index f2b0ac6559d..eda2f10d02b 100644 --- a/auditbeat/module/file_integrity/metricset.go +++ b/auditbeat/module/file_integrity/metricset.go @@ -18,17 +18,12 @@ import ( const ( moduleName = "file_integrity" metricsetName = "file" - logPrefix = "[" + moduleName + "]" bucketName = "file.v1" // Use old namespace for data until we do some field renaming for GA. namespace = "audit.file" ) -var ( - debugf = logp.MakeDebug(moduleName) -) - func init() { mb.Registry.MustAddMetricSet(moduleName, metricsetName, New, mb.DefaultMetricSet(), @@ -54,6 +49,7 @@ type MetricSet struct { config Config reader EventProducer scanner EventProducer + log *logp.Logger // Runtime params that are initialized on Run(). bucket datastore.BoltBucket @@ -80,6 +76,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { BaseMetricSet: base, config: config, reader: r, + log: logp.NewLogger(moduleName), } if config.ScanAtStart { @@ -89,7 +86,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { } } - debugf("Initialized the file event reader. Running as euid=%v", os.Geteuid()) + ms.log.Debugf("Initialized the file event reader. Running as euid=%v", os.Geteuid()) return ms, nil } @@ -139,7 +136,7 @@ func (ms *MetricSet) init(reporter mb.PushReporterV2) bool { if err != nil { err = errors.Wrap(err, "failed to open persistent datastore") reporter.Error(err) - logp.Err("%v %v", logPrefix, err) + ms.log.Errorw("Failed to initialize", "error", err) return false } ms.bucket = bucket.(datastore.BoltBucket) @@ -148,7 +145,7 @@ func (ms *MetricSet) init(reporter mb.PushReporterV2) bool { if err != nil { err = errors.Wrap(err, "failed to start fsnotify event producer") reporter.Error(err) - logp.Err("%v %v", logPrefix, err) + ms.log.Errorw("Failed to initialize", "error", err) return false } @@ -158,7 +155,7 @@ func (ms *MetricSet) init(reporter mb.PushReporterV2) bool { if err != nil { err = errors.Wrap(err, "failed to start file scanner") reporter.Error(err) - logp.Err("%v %v", logPrefix, err) + ms.log.Errorw("Failed to initialize", "error", err) return false } } @@ -167,9 +164,12 @@ func (ms *MetricSet) init(reporter mb.PushReporterV2) bool { } func (ms *MetricSet) reportEvent(reporter mb.PushReporterV2, event *Event) bool { - if len(event.errors) > 0 && logp.IsDebug(moduleName) { - debugf("Errors on event for %v with action=%v: %v", - event.Path, event.Action, event.errors) + if len(event.errors) == 1 { + ms.log.Debugw("Error in event", "file_path", event.Path, + "action", event.Action, "error", event.errors[0]) + } else if len(event.errors) > 1 { + ms.log.Debugw("Multiple errors in event", "file_path", event.Path, + "action", event.Action, "errors", event.errors) } changed, lastEvent := ms.hasFileChangedSinceLastEvent(event) @@ -183,11 +183,11 @@ func (ms *MetricSet) reportEvent(reporter mb.PushReporterV2, event *Event) bool // Persist event locally. if event.Info == nil { if err := ms.bucket.Delete(event.Path); err != nil { - logp.Err("%v %v", logPrefix, err) + ms.log.Errorw("Failed during DB delete", "error", err) } } else { if err := store(ms.bucket, event); err != nil { - logp.Err("%v %v", logPrefix, err) + ms.log.Errorw("Failed during DB store", "error", err) } } return true @@ -197,7 +197,7 @@ func (ms *MetricSet) hasFileChangedSinceLastEvent(event *Event) (changed bool, l // Load event from DB. lastEvent, err := load(ms.bucket, event.Path) if err != nil { - logp.Warn("%v %v", logPrefix, err) + ms.log.Warnw("Failed during DB load", "error", err) return true, lastEvent } @@ -206,18 +206,19 @@ func (ms *MetricSet) hasFileChangedSinceLastEvent(event *Event) (changed bool, l event.Action = action } - if changed && logp.IsDebug(moduleName) { - debugf("file at %v has changed since last seen: old=%v, new=%v", - event.Path, lastEvent, event) + if changed { + ms.log.Debugw("File changed since it was last seen", + "file_path", event.Path, "took", event.rtt, + logp.Namespace("event"), "old", lastEvent, "new", event) } return changed, lastEvent } func (ms *MetricSet) purgeDeleted(reporter mb.PushReporterV2) { for _, prefix := range ms.config.Paths { - deleted, err := purgeOlder(ms.bucket, ms.scanStart, prefix) + deleted, err := ms.purgeOlder(ms.scanStart, prefix) if err != nil { - logp.Err("%v %v", logPrefix, err) + ms.log.Errorw("Failure while purging older records", "error", err) continue } @@ -234,7 +235,7 @@ func (ms *MetricSet) purgeDeleted(reporter mb.PushReporterV2) { // purgeOlder does a prefix scan of the keys in the datastore and purges items // older than the specified time. -func purgeOlder(b datastore.BoltBucket, t time.Time, prefix string) ([]*Event, error) { +func (ms *MetricSet) purgeOlder(t time.Time, prefix string) ([]*Event, error) { var ( deleted []*Event totalKeys uint64 @@ -248,7 +249,7 @@ func purgeOlder(b datastore.BoltBucket, t time.Time, prefix string) ([]*Event, e startTime = time.Now() ) - err := b.Update(func(b *bolt.Bucket) error { + err := ms.bucket.Update(func(b *bolt.Bucket) error { c := b.Cursor() for path, v := c.Seek(p); path != nil && matchesPrefix(path); path, v = c.Next() { @@ -269,8 +270,14 @@ func purgeOlder(b datastore.BoltBucket, t time.Time, prefix string) ([]*Event, e return nil }) - debugf("Purged %v of %v entries in %v for %v", len(deleted), - totalKeys, time.Since(startTime), prefix) + took := time.Since(startTime) + ms.log.With( + "file_path", prefix, + "took", took, + "items_total", totalKeys, + "items_deleted", len(deleted)). + Debugf("Purged %v of %v entries in %v for %v", len(deleted), totalKeys, + time.Since(startTime), prefix) return deleted, err } diff --git a/auditbeat/module/file_integrity/scanner.go b/auditbeat/module/file_integrity/scanner.go index c64f06d96ee..6f7bc12a4dc 100644 --- a/auditbeat/module/file_integrity/scanner.go +++ b/auditbeat/module/file_integrity/scanner.go @@ -2,15 +2,12 @@ package file_integrity import ( "errors" - "fmt" "math" "os" "path/filepath" - "strings" "sync/atomic" "time" - "github.com/dustin/go-humanize" "github.com/juju/ratelimit" "github.com/elastic/beats/libbeat/logp" @@ -29,20 +26,17 @@ type scanner struct { done <-chan struct{} eventC chan Event - logID string // Unique ID to correlate log messages to a single instance. - logPrefix string - config Config + log *logp.Logger + config Config } // NewFileSystemScanner creates a new EventProducer instance that scans the // configured file paths. func NewFileSystemScanner(c Config) (EventProducer, error) { - logID := fmt.Sprintf("[scanner-%v]", atomic.AddUint32(&scannerID, 1)) return &scanner{ - logID: logID, - logPrefix: fmt.Sprintf("%v %v", logPrefix, logID), - config: c, - eventC: make(chan Event, 1), + log: logp.NewLogger(moduleName).With("scanner_id", atomic.AddUint32(&scannerID, 1)), + config: c, + eventC: make(chan Event, 1), }, nil } @@ -54,9 +48,12 @@ func (s *scanner) Start(done <-chan struct{}) (<-chan Event, error) { s.done = done if s.config.ScanRateBytesPerSec > 0 { - debugf("%v creating token bucket with rate %v/sec and capacity %v", - s.logID, s.config.ScanRatePerSec, - humanize.Bytes(s.config.MaxFileSizeBytes)) + s.log.With( + "bytes_per_sec", s.config.ScanRateBytesPerSec, + "capacity_bytes", s.config.MaxFileSizeBytes). + Debugf("Creating token bucket with rate %v/sec and capacity %v", + s.config.ScanRatePerSec, + s.config.MaxFileSize) s.tokenBucket = ratelimit.NewBucketWithRate( float64(s.config.ScanRateBytesPerSec)/2., // Fill Rate @@ -70,11 +67,8 @@ func (s *scanner) Start(done <-chan struct{}) (<-chan Event, error) { // scan iterates over the configured paths and generates events for each file. func (s *scanner) scan() { - if logp.IsDebug(metricsetName) { - debugf("%v File system scanner is starting for paths [%v].", - s.logID, strings.Join(s.config.Paths, ", ")) - defer debugf("%v File system scanner is stopping.", s.logID) - } + s.log.Debugw("File system scanner is starting", "file_path", s.config.Paths) + defer s.log.Debug("File system scanner is stopping") defer close(s.eventC) startTime := time.Now() @@ -82,22 +76,25 @@ func (s *scanner) scan() { // Resolve symlinks to ensure we have an absolute path. evalPath, err := filepath.EvalSymlinks(path) if err != nil { - logp.Warn("%v failed to scan %v: %v", s.logPrefix, path, err) + s.log.Warnw("Failed to scan", "file_path", path, "error", err) continue } if err = s.walkDir(evalPath); err != nil { - logp.Warn("%v failed to scan %v: %v", s.logPrefix, evalPath, err) + s.log.Warnw("Failed to scan", "file_path", evalPath, "error", err) } } duration := time.Since(startTime) byteCount := atomic.LoadUint64(&s.byteCount) fileCount := atomic.LoadUint64(&s.fileCount) - logp.Info("%v File system scan completed after %v (%v files, %v bytes, %v/sec, %f files/sec).", - s.logPrefix, duration, s.fileCount, byteCount, - humanize.Bytes(uint64(float64(byteCount)/float64(duration)*float64(time.Second))), - float64(fileCount)/float64(duration)*float64(time.Second)) + s.log.Infow("File system scan completed", + "took", duration, + "file_count", fileCount, + "total_bytes", byteCount, + "bytes_per_sec", float64(byteCount)/float64(duration)*float64(time.Second), + "files_per_sec", float64(fileCount)/float64(duration)*float64(time.Second), + ) } func (s *scanner) walkDir(dir string) error {