Skip to content

Commit

Permalink
fsnotify-backed recursive watcher for Linux (#5833)
Browse files Browse the repository at this point in the history
Adds a wrapper on top of fsnotify that allows to place
recursive watches on directories. Exhibits similar behavior
to FSEvents.

This adds an explanation to the `file.recursive` flag and also updates
the description of the underlying implementation for different OSes, to
account for recent changes.
* Document file.recursive flag.

* fsnotify-backed recursive watcher

* Update CHANGELOG for recursive file monitoring

* Updated auditbeat file metric docs
  • Loading branch information
andrewkroh authored Dec 13, 2017
2 parents 2e3ac67 + f3ce7a7 commit 5e66f26
Show file tree
Hide file tree
Showing 11 changed files with 806 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di

- Add support for SHA3 hash algorithms to the file integrity module. {issue}5345[5345]
- Add dashboards for Linux audit framework events (overview, executions, sockets). {pull}5516[5516]
- Add support for recursive file watches under macOS {pull}5575[5575] and Linux. {pull}5833[5833]

*Filebeat*

Expand Down
3 changes: 3 additions & 0 deletions auditbeat/auditbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ auditbeat.modules:
# sha3_384 and sha3_512. Default is sha1.
file.hash_types: [sha1]

# Detect changes to files included in subdirectories. Disabled by default.
file.recursive: false


#================================ General ======================================

Expand Down
3 changes: 3 additions & 0 deletions auditbeat/module/audit/_meta/config.yml.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,7 @@
# sha224, sha256, sha384, sha512, sha512_224, sha512_256, sha3_224, sha3_256,
# sha3_384 and sha3_512. Default is sha1.
file.hash_types: [sha1]

# Detect changes to files included in subdirectories. Disabled by default.
file.recursive: false
{{- end }}
24 changes: 15 additions & 9 deletions auditbeat/module/audit/file/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ The operating system features that power this feature are as follows.

* Linux - `inotify` is used, and therefore the kernel must have inotify support.
Inotify was initially merged into the 2.6.13 Linux kernel.
* macOS (Darwin) - `kqueue` is used. It requires one file descriptor for each
file so please check the `ulimit` values used with {beatname_uc}. The FSEvents
API was considered for the implementation, but FSEvents coalesces multiple
notifications into a single event which is inconsistent with the metricset's
behavior on other operating systems.
* macOS (Darwin) - Uses the `FSEvents` API, present since macOS 10.5. This API
coalesces multiple changes to a file into a single event. {beatname_uc} translates
this coalesced changes into a meaningful sequence of actions. However,
in rare situations the reported events may have a different ordering than what
actually happened.
* Windows - `ReadDirectoryChangesW` is used.

The file metricset should not be used to monitor paths on network file systems.
Expand All @@ -53,11 +53,11 @@ Linux.
file.scan_rate_per_sec: 50 MiB
file.max_file_size: 100 MiB
file.hash_types: [sha1]
file.recursive: false
----

*`file.paths`*:: A list of paths (directories or files) to watch. The watches
are non-recursive and globs are not supported. The specified paths should exist
when the metricset is started.
*`file.paths`*:: A list of paths (directories or files) to watch. Globs are
not supported. The specified paths should exist when the metricset is started.

*`file.scan_at_start`*:: A boolean value that controls if {beatname_uc} scans
over the configured file paths at startup and send events for the files
Expand All @@ -84,4 +84,10 @@ a suffix to the value. The supported units are `b` (default), `kib`, `kb`, `mib`

*`file.hash_types`*:: A list of hash types to compute when the file changes.
The supported hash types are md5, sha1, sha224, sha256, sha384, sha512,
sha512_224, sha512_256, sha3_224, sha3_256, sha3_384 and sha3_512. The default value is sha1.
sha512_224, sha512_256, sha3_224, sha3_256, sha3_384 and sha3_512. The default
value is sha1.

*`file.recursive`*:: By default, the watches set to the paths specified in
`file.paths` are not recursive. This means that only changes to the contents
of this directories are watched. If `file.recursive` is set to `true`, the file
metric will watch for changes on this directories and all their subdirectories.
19 changes: 10 additions & 9 deletions auditbeat/module/audit/file/eventreader_fsnotify.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,25 @@
package file

import (
"errors"
"syscall"
"time"

"github.com/fsnotify/fsnotify"
"github.com/pkg/errors"

"github.com/elastic/beats/auditbeat/module/audit/file/monitor"
"github.com/elastic/beats/libbeat/logp"
)

type reader struct {
watcher *fsnotify.Watcher
watcher monitor.Watcher
config Config
eventC chan Event
}

// NewEventReader creates a new EventProducer backed by fsnotify.
func NewEventReader(c Config) (EventProducer, error) {
if c.Recursive {
return nil, errors.New("recursive file auditing not supported in this platform (see file.recursive)")
}
watcher, err := fsnotify.NewWatcher()
watcher, err := monitor.New(c.Recursive)
if err != nil {
return nil, err
}
Expand All @@ -47,8 +45,11 @@ func (r *reader) Start(done <-chan struct{}) (<-chan Event, error) {
}
}

if err := r.watcher.Start(); err != nil {
return nil, errors.Wrap(err, "unable to start watcher")
}
go r.consumeEvents()
logp.Info("%v started fsnotify watcher", logPrefix)
logp.Info("%v started fsnotify watcher recursive:%v", logPrefix, r.config.Recursive)
return r.eventC, nil
}

Expand All @@ -58,7 +59,7 @@ func (r *reader) consumeEvents() {

for {
select {
case event := <-r.watcher.Events:
case event := <-r.watcher.EventChannel():
if event.Name == "" {
continue
}
Expand All @@ -71,7 +72,7 @@ func (r *reader) consumeEvents() {
e.rtt = time.Since(start)

r.eventC <- e
case err := <-r.watcher.Errors:
case err := <-r.watcher.ErrorChannel():
logp.Warn("%v fsnotify watcher error: %v", logPrefix, err)
}
}
Expand Down
142 changes: 142 additions & 0 deletions auditbeat/module/audit/file/monitor/filetree.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package monitor

import (
"fmt"
"os"
path_pkg "path"
"strings"
)

// VisitOrder is a two-valued flag used to control how directories are visited.
type VisitOrder int8

const (
// PreOrder has directories visited before their contents.
PreOrder VisitOrder = iota
// PostOrder has directories visited after their contents.
PostOrder
)

var (
// PathSeparator can be used to override the operating system separator.
PathSeparator = string(os.PathSeparator)
)

// FileTree represents a directory in a filesystem-tree structure.
type FileTree map[string]FileTree

// VisitFunc is the type for a callback to visit the entries on a directory
// and its subdirectories.
type VisitFunc func(path string, isDir bool) error

// AddFile adds a file to a FileTree. If the path includes subdirectories
// they are created as necessary.
func (tree FileTree) AddFile(path string) error {
return tree.add(path_pkg.Clean(path), nil)
}

// AddDir adds a directory to a FileTree. If the path includes subdirectories
// they are created as necessary.
func (tree FileTree) AddDir(path string) error {
return tree.add(path_pkg.Clean(path), FileTree{})
}

// Remove an entry from a FileTree.
func (tree FileTree) Remove(path string) error {
components := strings.Split(path, PathSeparator)
last := -1
for pos := len(components) - 1; pos >= 0; pos-- {
if len(components[pos]) != 0 {
last = pos
break
}
}
if last > 0 {
subtree, err := tree.getByComponents(path, components[:last])
if err != nil {
return err
}
delete(subtree, components[last])
}
return nil
}

// Visit calls the callback function for the given path and recursively all its
// contents, if a directory path is passed.
func (tree FileTree) Visit(path string, order VisitOrder, fn VisitFunc) error {
entry, err := tree.At(path)
if err != nil {
return err
}
return entry.visitDirRecursive(path_pkg.Clean(path), order, fn)
}

// At returns a new FileTree rooted at the given path.
func (tree FileTree) At(path string) (FileTree, error) {
return tree.getByComponents(path, strings.Split(path, PathSeparator))
}

func (tree FileTree) add(path string, value FileTree) error {
components := strings.Split(path, PathSeparator)
dir, last := tree, len(components)-1
for i := 0; i < last; i++ {
if len(components[i]) == 0 {
continue
}
if next, exists := dir[components[i]]; exists {
if next == nil {
return fmt.Errorf("directory expected: '%s' in %s", components[i], path)
}
dir = next
} else {
newDir := FileTree{}
dir[components[i]] = newDir
dir = newDir
}
}
dir[components[last]] = value
return nil
}

func (tree FileTree) getByComponents(path string, components []string) (FileTree, error) {
dir, exists := tree, false
for _, item := range components {
if len(item) != 0 {
if dir == nil {
// previous component is a file, not a directory
return nil, fmt.Errorf("path component %s is a file: %s", item, path)
}
if dir, exists = dir[item]; !exists {
return nil, fmt.Errorf("path component %s not found in %s", item, path)
}
}
}
return dir, nil
}

func (tree FileTree) visitDirRecursive(path string, order VisitOrder, fn VisitFunc) error {
if tree == nil {
return fn(path, false)
}
if order == PreOrder {
if err := fn(path, true); err != nil {
return err
}
}
for name, content := range tree {
fullpath := path_pkg.Join(path, name)
if content == nil {
if err := fn(fullpath, false); err != nil {
return err
}
} else {
if err := content.visitDirRecursive(fullpath, order, fn); err != nil {
return err
}
}
}
if order == PostOrder {
return fn(path, true)
}
return nil
}
Loading

0 comments on commit 5e66f26

Please sign in to comment.