Skip to content

Commit

Permalink
Add initial_scan action for scanning new paths. (#7954)
Browse files Browse the repository at this point in the history
When `scan_at_start: true` the filesystem scanner will look through
all paths and record any new or modified files and directories it finds.
This commit introduces a new action `initial_scan` that the scanner uses
when it scans a path for the first time. Previously, the scanner would
report everything under a new path as `created`, making it impossible
to distinguish between files and directories that are truly new (e.g.
have been added during a restart of Auditbeat) and those that are merely
new because they are under a newly added path in the configuration.

Resolves #7821
  • Loading branch information
Christoph Wurm authored and andrewkroh committed Aug 16, 2018
1 parent 720e9d9 commit 95e53bc
Show file tree
Hide file tree
Showing 9 changed files with 144 additions and 56 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff]

*Auditbeat*

- Use `initial_scan` action for new paths. {pull}7954[7954]

*Filebeat*

*Heartbeat*
Expand Down
2 changes: 1 addition & 1 deletion auditbeat/_meta/fields.common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
Action describes the change that triggered the event.
For the file integrity module the possible values are:
attributes_modified, created, deleted, updated, moved, and config_change.
attributes_modified, created, deleted, updated, moved, config_change, and initial_scan.
- name: file
type: group
Expand Down
2 changes: 1 addition & 1 deletion auditbeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -2769,7 +2769,7 @@ type: keyword
example: logged-in
Action describes the change that triggered the event.
For the file integrity module the possible values are: attributes_modified, created, deleted, updated, moved, and config_change.
For the file integrity module the possible values are: attributes_modified, created, deleted, updated, moved, config_change, and initial_scan.
--
Expand Down
2 changes: 1 addition & 1 deletion auditbeat/include/fields.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions auditbeat/module/file_integrity/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
Updated
Moved
ConfigChange
InitialScan
)

var actionNames = map[Action]string{
Expand All @@ -47,6 +48,7 @@ var actionNames = map[Action]string{
Updated: "updated",
Moved: "moved",
ConfigChange: "config_change",
InitialScan: "initial_scan",
}

type actionOrderKey struct {
Expand Down
44 changes: 36 additions & 8 deletions auditbeat/module/file_integrity/metricset.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package file_integrity
import (
"bytes"
"os"
"path/filepath"
"time"

bolt "github.com/coreos/bbolt"
Expand Down Expand Up @@ -93,13 +94,6 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
log: logp.NewLogger(moduleName),
}

if config.ScanAtStart {
ms.scanner, err = NewFileSystemScanner(config)
if err != nil {
return nil, errors.Wrap(err, "failed to initialize file scanner")
}
}

ms.log.Debugf("Initialized the file event reader. Running as euid=%v", os.Geteuid())

return ms, nil
Expand Down Expand Up @@ -164,7 +158,15 @@ func (ms *MetricSet) init(reporter mb.PushReporterV2) bool {
}

ms.scanStart = time.Now().UTC()
if ms.scanner != nil {
if ms.config.ScanAtStart {
ms.scanner, err = NewFileSystemScanner(ms.config, ms.findNewPaths())
if err != nil {
err = errors.Wrap(err, "failed to initialize file scanner")
reporter.Error(err)
ms.log.Errorw("Failed to initialize", "error", err)
return false
}

ms.scanChan, err = ms.scanner.Start(reporter.Done())
if err != nil {
err = errors.Wrap(err, "failed to start file scanner")
Expand All @@ -177,6 +179,32 @@ func (ms *MetricSet) init(reporter mb.PushReporterV2) bool {
return true
}

// findNewPaths determines which - if any - paths have been newly added to the config.
func (ms *MetricSet) findNewPaths() map[string]struct{} {
newPaths := make(map[string]struct{})

for _, path := range ms.config.Paths {
// Resolve symlinks to ensure we have an absolute path.
evalPath, err := filepath.EvalSymlinks(path)
if err != nil {
ms.log.Warnw("Failed to resolve", "file_path", path, "error", err)
continue
}

lastEvent, err := load(ms.bucket, evalPath)
if err != nil {
ms.log.Warnw("Failed during DB load", "error", err)
continue
}

if lastEvent == nil {
newPaths[evalPath] = struct{}{}
}
}

return newPaths
}

func (ms *MetricSet) reportEvent(reporter mb.PushReporterV2, event *Event) bool {
if len(event.errors) == 1 {
ms.log.Debugw("Error in event", "file_path", event.Path,
Expand Down
105 changes: 76 additions & 29 deletions auditbeat/module/file_integrity/metricset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package file_integrity

import (
"crypto/sha1"
"io/ioutil"
"os"
"path/filepath"
Expand Down Expand Up @@ -59,7 +60,7 @@ func TestData(t *testing.T) {
mbtest.WriteEventToDataJSON(t, fullEvent, "")
}

func TestDetectDeletedFiles(t *testing.T) {
func TestActions(t *testing.T) {
defer setup(t)()

bucket, err := datastore.OpenBucket(bucketName)
Expand All @@ -68,6 +69,7 @@ func TestDetectDeletedFiles(t *testing.T) {
}
defer bucket.Close()

// First directory
dir, err := ioutil.TempDir("", "audit-file")
if err != nil {
t.Fatal(err)
Expand All @@ -79,46 +81,91 @@ func TestDetectDeletedFiles(t *testing.T) {
t.Fatal(err)
}

// Second directory (to be reported with "initial_scan")
newDir, err := ioutil.TempDir("", "audit-file-new")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(newDir)

newDir, err = filepath.EvalSymlinks(newDir)
if err != nil {
t.Fatal(err)
}

createdFilepath := filepath.Join(dir, "created.txt")
updatedFilepath := filepath.Join(dir, "updated.txt")
deletedFilepath := filepath.Join(dir, "deleted.txt")

// Add first directory to db (so that files in it are not reported with "initial_scan")
e := &Event{
Timestamp: time.Now().UTC(),
Path: filepath.Join(dir, "ghost.file"),
Action: Created,
Path: dir,
Action: InitialScan,
}
if err = store(bucket, e); err != nil {
t.Fatal(err)
}

ms := mbtest.NewPushMetricSetV2(t, getConfig(dir))
events := mbtest.RunPushMetricSetV2(10*time.Second, 2, ms)
for _, e := range events {
if e.Error != nil {
t.Fatalf("received error: %+v", e.Error)
}
// Add fake event for non-existing file to db to simulate when a file has been deleted
deletedFileEvent := &Event{
Timestamp: time.Now().UTC(),
Path: deletedFilepath,
Action: Created,
}
if err = store(bucket, deletedFileEvent); err != nil {
t.Fatal(err)
}

if !assert.Len(t, events, 2) {
return
// Insert fake file event into db to simulate when a file has changed
digest := sha1.New().Sum([]byte("different string"))
updatedFileEvent := &Event{
Timestamp: time.Now().UTC(),
Path: updatedFilepath,
Action: Created,
Hashes: map[HashType]Digest{SHA1: digest},
}
event := events[0].MetricSetFields
path, err := event.GetValue("file.path")
if assert.NoError(t, err) {
assert.Equal(t, dir, path)
if err = store(bucket, updatedFileEvent); err != nil {
t.Fatal(err)
}

action, err := event.GetValue("event.action")
if assert.NoError(t, err) {
assert.Equal(t, []string{"created"}, action)
}
// Create some files in first directory
go func() {
ioutil.WriteFile(createdFilepath, []byte("hello world"), 0600)
ioutil.WriteFile(updatedFilepath, []byte("hello world"), 0600)
}()

event = events[1].MetricSetFields
path, err = event.GetValue("file.path")
if assert.NoError(t, err) {
assert.Equal(t, e.Path, path)
}
ms := mbtest.NewPushMetricSetV2(t, getConfig(dir, newDir))
events := mbtest.RunPushMetricSetV2(10*time.Second, 5, ms)
assert.Len(t, events, 5)

action, err = event.GetValue("event.action")
if assert.NoError(t, err) {
assert.Equal(t, []string{"deleted"}, action)
for _, event := range events {
if event.Error != nil {
t.Fatalf("received error: %+v", event.Error)
}

actions, err := event.MetricSetFields.GetValue("event.action")
path, err2 := event.MetricSetFields.GetValue("file.path")
if assert.NoError(t, err) && assert.NoError(t, err2) {
// Note: Actions reported for a file or directory will be different
// depending on whether the scanner or the platform-dependent
// filesystem event listener reported it. The subset of actions we test
// for here should be consistent across all cases though.
switch path.(string) {
case newDir:
assert.Contains(t, actions, "initial_scan")
case dir:
assert.Contains(t, actions, "attributes_modified")
case deletedFilepath:
assert.Contains(t, actions, "deleted")
case createdFilepath:
assert.Contains(t, actions, "created")
case updatedFilepath:
assert.Contains(t, actions, "updated")
default:
assert.Fail(t, "unexpected path", "path %v", path)
}
}
}
}

Expand Down Expand Up @@ -251,10 +298,10 @@ func setup(t testing.TB) func() {
return func() { os.RemoveAll(paths.Paths.Data) }
}

func getConfig(path string) map[string]interface{} {
func getConfig(path ...string) map[string]interface{} {
return map[string]interface{}{
"module": "file_integrity",
"paths": []string{path},
"paths": path,
"exclude_files": []string{`(?i)\.sw[nop]$`, `[/\\]\.git([/\\]|$)`},
}
}
35 changes: 22 additions & 13 deletions auditbeat/module/file_integrity/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,20 @@ type scanner struct {
done <-chan struct{}
eventC chan Event

log *logp.Logger
config Config
log *logp.Logger
config Config
newPaths map[string]struct{}
}

// NewFileSystemScanner creates a new EventProducer instance that scans the
// configured file paths.
func NewFileSystemScanner(c Config) (EventProducer, error) {
// configured file paths. Files and directories in new paths are recorded with
// the action `found`.
func NewFileSystemScanner(c Config, newPathsInConfig map[string]struct{}) (EventProducer, error) {
return &scanner{
log: logp.NewLogger(moduleName).With("scanner_id", atomic.AddUint32(&scannerID, 1)),
config: c,
eventC: make(chan Event, 1),
log: logp.NewLogger(moduleName).With("scanner_id", atomic.AddUint32(&scannerID, 1)),
config: c,
newPaths: newPathsInConfig,
eventC: make(chan Event, 1),
}, nil
}

Expand Down Expand Up @@ -83,7 +86,7 @@ func (s *scanner) Start(done <-chan struct{}) (<-chan Event, error) {

// scan iterates over the configured paths and generates events for each file.
func (s *scanner) scan() {
s.log.Debugw("File system scanner is starting", "file_path", s.config.Paths)
s.log.Debugw("File system scanner is starting", "file_path", s.config.Paths, "new_path", s.newPaths)
defer s.log.Debug("File system scanner is stopping")
defer close(s.eventC)
startTime := time.Now()
Expand All @@ -96,7 +99,13 @@ func (s *scanner) scan() {
continue
}

if err = s.walkDir(evalPath); err != nil {
// If action is None it will be filled later in the Metricset
action := None
if _, exists := s.newPaths[evalPath]; exists {
action = InitialScan
}

if err = s.walkDir(evalPath, action); err != nil {
s.log.Warnw("Failed to scan", "file_path", evalPath, "error", err)
}
}
Expand All @@ -113,7 +122,7 @@ func (s *scanner) scan() {
)
}

func (s *scanner) walkDir(dir string) error {
func (s *scanner) walkDir(dir string, action Action) error {
errDone := errors.New("done")
startTime := time.Now()
err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
Expand All @@ -133,7 +142,7 @@ func (s *scanner) walkDir(dir string) error {
}
defer func() { startTime = time.Now() }()

event := s.newScanEvent(path, info, err)
event := s.newScanEvent(path, info, err, action)
event.rtt = time.Since(startTime)
select {
case s.eventC <- event:
Expand Down Expand Up @@ -192,8 +201,8 @@ func (s *scanner) throttle(fileSize uint64) {
}
}

func (s *scanner) newScanEvent(path string, info os.FileInfo, err error) Event {
event := NewEventFromFileInfo(path, info, err, None, SourceScan,
func (s *scanner) newScanEvent(path string, info os.FileInfo, err error, action Action) Event {
event := NewEventFromFileInfo(path, info, err, action, SourceScan,
s.config.MaxFileSizeBytes, s.config.HashTypes)

// Update metrics.
Expand Down
6 changes: 3 additions & 3 deletions auditbeat/module/file_integrity/scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestScanner(t *testing.T) {
}

t.Run("non-recursive", func(t *testing.T) {
reader, err := NewFileSystemScanner(config)
reader, err := NewFileSystemScanner(config, nil)
if err != nil {
t.Fatal(err)
}
Expand All @@ -62,7 +62,7 @@ func TestScanner(t *testing.T) {
c := config
c.Recursive = true

reader, err := NewFileSystemScanner(c)
reader, err := NewFileSystemScanner(c, nil)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -94,7 +94,7 @@ func TestScanner(t *testing.T) {
c := config
c.ScanRateBytesPerSec = 1024 * 5

reader, err := NewFileSystemScanner(c)
reader, err := NewFileSystemScanner(c, nil)
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit 95e53bc

Please sign in to comment.