Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auditbeat: initial_scan action for new paths #7954

Merged
merged 1 commit into from
Aug 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff]

*Auditbeat*

- Use `initial_scan` action for new paths. {pull}7954[7954]

*Filebeat*

*Heartbeat*
Expand Down
2 changes: 1 addition & 1 deletion auditbeat/_meta/fields.common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
Action describes the change that triggered the event.

For the file integrity module the possible values are:
attributes_modified, created, deleted, updated, moved, and config_change.
attributes_modified, created, deleted, updated, moved, config_change, and initial_scan.

- name: file
type: group
Expand Down
2 changes: 1 addition & 1 deletion auditbeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -2769,7 +2769,7 @@ type: keyword
example: logged-in

Action describes the change that triggered the event.
For the file integrity module the possible values are: attributes_modified, created, deleted, updated, moved, and config_change.
For the file integrity module the possible values are: attributes_modified, created, deleted, updated, moved, config_change, and initial_scan.


--
Expand Down
2 changes: 1 addition & 1 deletion auditbeat/include/fields.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions auditbeat/module/file_integrity/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
Updated
Moved
ConfigChange
InitialScan
)

var actionNames = map[Action]string{
Expand All @@ -47,6 +48,7 @@ var actionNames = map[Action]string{
Updated: "updated",
Moved: "moved",
ConfigChange: "config_change",
InitialScan: "initial_scan",
}

type actionOrderKey struct {
Expand Down
44 changes: 36 additions & 8 deletions auditbeat/module/file_integrity/metricset.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package file_integrity
import (
"bytes"
"os"
"path/filepath"
"time"

bolt "github.com/coreos/bbolt"
Expand Down Expand Up @@ -93,13 +94,6 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
log: logp.NewLogger(moduleName),
}

if config.ScanAtStart {
ms.scanner, err = NewFileSystemScanner(config)
if err != nil {
return nil, errors.Wrap(err, "failed to initialize file scanner")
}
}

ms.log.Debugf("Initialized the file event reader. Running as euid=%v", os.Geteuid())

return ms, nil
Expand Down Expand Up @@ -164,7 +158,15 @@ func (ms *MetricSet) init(reporter mb.PushReporterV2) bool {
}

ms.scanStart = time.Now().UTC()
if ms.scanner != nil {
if ms.config.ScanAtStart {
ms.scanner, err = NewFileSystemScanner(ms.config, ms.findNewPaths())
if err != nil {
err = errors.Wrap(err, "failed to initialize file scanner")
reporter.Error(err)
ms.log.Errorw("Failed to initialize", "error", err)
return false
}

ms.scanChan, err = ms.scanner.Start(reporter.Done())
if err != nil {
err = errors.Wrap(err, "failed to start file scanner")
Expand All @@ -177,6 +179,32 @@ func (ms *MetricSet) init(reporter mb.PushReporterV2) bool {
return true
}

// findNewPaths determines which - if any - paths have been newly added to the config.
func (ms *MetricSet) findNewPaths() map[string]struct{} {
newPaths := make(map[string]struct{})

for _, path := range ms.config.Paths {
// Resolve symlinks to ensure we have an absolute path.
evalPath, err := filepath.EvalSymlinks(path)
if err != nil {
ms.log.Warnw("Failed to resolve", "file_path", path, "error", err)
continue
}

lastEvent, err := load(ms.bucket, evalPath)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like nice clear solution to the problem. 👍

if err != nil {
ms.log.Warnw("Failed during DB load", "error", err)
continue
}

if lastEvent == nil {
newPaths[evalPath] = struct{}{}
}
}

return newPaths
}

func (ms *MetricSet) reportEvent(reporter mb.PushReporterV2, event *Event) bool {
if len(event.errors) == 1 {
ms.log.Debugw("Error in event", "file_path", event.Path,
Expand Down
105 changes: 76 additions & 29 deletions auditbeat/module/file_integrity/metricset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package file_integrity

import (
"crypto/sha1"
"io/ioutil"
"os"
"path/filepath"
Expand Down Expand Up @@ -59,7 +60,7 @@ func TestData(t *testing.T) {
mbtest.WriteEventToDataJSON(t, fullEvent, "")
}

func TestDetectDeletedFiles(t *testing.T) {
func TestActions(t *testing.T) {
defer setup(t)()

bucket, err := datastore.OpenBucket(bucketName)
Expand All @@ -68,6 +69,7 @@ func TestDetectDeletedFiles(t *testing.T) {
}
defer bucket.Close()

// First directory
dir, err := ioutil.TempDir("", "audit-file")
if err != nil {
t.Fatal(err)
Expand All @@ -79,46 +81,91 @@ func TestDetectDeletedFiles(t *testing.T) {
t.Fatal(err)
}

// Second directory (to be reported with "initial_scan")
newDir, err := ioutil.TempDir("", "audit-file-new")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(newDir)

newDir, err = filepath.EvalSymlinks(newDir)
if err != nil {
t.Fatal(err)
}

createdFilepath := filepath.Join(dir, "created.txt")
updatedFilepath := filepath.Join(dir, "updated.txt")
deletedFilepath := filepath.Join(dir, "deleted.txt")

// Add first directory to db (so that files in it are not reported with "initial_scan")
e := &Event{
Timestamp: time.Now().UTC(),
Path: filepath.Join(dir, "ghost.file"),
Action: Created,
Path: dir,
Action: InitialScan,
}
if err = store(bucket, e); err != nil {
t.Fatal(err)
}

ms := mbtest.NewPushMetricSetV2(t, getConfig(dir))
events := mbtest.RunPushMetricSetV2(10*time.Second, 2, ms)
for _, e := range events {
if e.Error != nil {
t.Fatalf("received error: %+v", e.Error)
}
// Add fake event for non-existing file to db to simulate when a file has been deleted
deletedFileEvent := &Event{
Timestamp: time.Now().UTC(),
Path: deletedFilepath,
Action: Created,
}
if err = store(bucket, deletedFileEvent); err != nil {
t.Fatal(err)
}

if !assert.Len(t, events, 2) {
return
// Insert fake file event into db to simulate when a file has changed
digest := sha1.New().Sum([]byte("different string"))
updatedFileEvent := &Event{
Timestamp: time.Now().UTC(),
Path: updatedFilepath,
Action: Created,
Hashes: map[HashType]Digest{SHA1: digest},
}
event := events[0].MetricSetFields
path, err := event.GetValue("file.path")
if assert.NoError(t, err) {
assert.Equal(t, dir, path)
if err = store(bucket, updatedFileEvent); err != nil {
t.Fatal(err)
}

action, err := event.GetValue("event.action")
if assert.NoError(t, err) {
assert.Equal(t, []string{"created"}, action)
}
// Create some files in first directory
go func() {
ioutil.WriteFile(createdFilepath, []byte("hello world"), 0600)
ioutil.WriteFile(updatedFilepath, []byte("hello world"), 0600)
}()

event = events[1].MetricSetFields
path, err = event.GetValue("file.path")
if assert.NoError(t, err) {
assert.Equal(t, e.Path, path)
}
ms := mbtest.NewPushMetricSetV2(t, getConfig(dir, newDir))
events := mbtest.RunPushMetricSetV2(10*time.Second, 5, ms)
assert.Len(t, events, 5)

action, err = event.GetValue("event.action")
if assert.NoError(t, err) {
assert.Equal(t, []string{"deleted"}, action)
for _, event := range events {
if event.Error != nil {
t.Fatalf("received error: %+v", event.Error)
}

actions, err := event.MetricSetFields.GetValue("event.action")
path, err2 := event.MetricSetFields.GetValue("file.path")
if assert.NoError(t, err) && assert.NoError(t, err2) {
// Note: Actions reported for a file or directory will be different
// depending on whether the scanner or the platform-dependent
// filesystem event listener reported it. The subset of actions we test
// for here should be consistent across all cases though.
switch path.(string) {
case newDir:
assert.Contains(t, actions, "initial_scan")
case dir:
assert.Contains(t, actions, "attributes_modified")
case deletedFilepath:
assert.Contains(t, actions, "deleted")
case createdFilepath:
assert.Contains(t, actions, "created")
case updatedFilepath:
assert.Contains(t, actions, "updated")
default:
assert.Fail(t, "unexpected path", "path %v", path)
}
}
}
}

Expand Down Expand Up @@ -251,10 +298,10 @@ func setup(t testing.TB) func() {
return func() { os.RemoveAll(paths.Paths.Data) }
}

func getConfig(path string) map[string]interface{} {
func getConfig(path ...string) map[string]interface{} {
return map[string]interface{}{
"module": "file_integrity",
"paths": []string{path},
"paths": path,
"exclude_files": []string{`(?i)\.sw[nop]$`, `[/\\]\.git([/\\]|$)`},
}
}
35 changes: 22 additions & 13 deletions auditbeat/module/file_integrity/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,20 @@ type scanner struct {
done <-chan struct{}
eventC chan Event

log *logp.Logger
config Config
log *logp.Logger
config Config
newPaths map[string]struct{}
}

// NewFileSystemScanner creates a new EventProducer instance that scans the
// configured file paths.
func NewFileSystemScanner(c Config) (EventProducer, error) {
// configured file paths. Files and directories in new paths are recorded with
// the action `found`.
func NewFileSystemScanner(c Config, newPathsInConfig map[string]struct{}) (EventProducer, error) {
return &scanner{
log: logp.NewLogger(moduleName).With("scanner_id", atomic.AddUint32(&scannerID, 1)),
config: c,
eventC: make(chan Event, 1),
log: logp.NewLogger(moduleName).With("scanner_id", atomic.AddUint32(&scannerID, 1)),
config: c,
newPaths: newPathsInConfig,
eventC: make(chan Event, 1),
}, nil
}

Expand Down Expand Up @@ -83,7 +86,7 @@ func (s *scanner) Start(done <-chan struct{}) (<-chan Event, error) {

// scan iterates over the configured paths and generates events for each file.
func (s *scanner) scan() {
s.log.Debugw("File system scanner is starting", "file_path", s.config.Paths)
s.log.Debugw("File system scanner is starting", "file_path", s.config.Paths, "new_path", s.newPaths)
defer s.log.Debug("File system scanner is stopping")
defer close(s.eventC)
startTime := time.Now()
Expand All @@ -96,7 +99,13 @@ func (s *scanner) scan() {
continue
}

if err = s.walkDir(evalPath); err != nil {
// If action is None it will be filled later in the Metricset
action := None
if _, exists := s.newPaths[evalPath]; exists {
action = InitialScan
}

if err = s.walkDir(evalPath, action); err != nil {
s.log.Warnw("Failed to scan", "file_path", evalPath, "error", err)
}
}
Expand All @@ -113,7 +122,7 @@ func (s *scanner) scan() {
)
}

func (s *scanner) walkDir(dir string) error {
func (s *scanner) walkDir(dir string, action Action) error {
errDone := errors.New("done")
startTime := time.Now()
err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
Expand All @@ -133,7 +142,7 @@ func (s *scanner) walkDir(dir string) error {
}
defer func() { startTime = time.Now() }()

event := s.newScanEvent(path, info, err)
event := s.newScanEvent(path, info, err, action)
event.rtt = time.Since(startTime)
select {
case s.eventC <- event:
Expand Down Expand Up @@ -192,8 +201,8 @@ func (s *scanner) throttle(fileSize uint64) {
}
}

func (s *scanner) newScanEvent(path string, info os.FileInfo, err error) Event {
event := NewEventFromFileInfo(path, info, err, None, SourceScan,
func (s *scanner) newScanEvent(path string, info os.FileInfo, err error, action Action) Event {
event := NewEventFromFileInfo(path, info, err, action, SourceScan,
s.config.MaxFileSizeBytes, s.config.HashTypes)

// Update metrics.
Expand Down
6 changes: 3 additions & 3 deletions auditbeat/module/file_integrity/scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestScanner(t *testing.T) {
}

t.Run("non-recursive", func(t *testing.T) {
reader, err := NewFileSystemScanner(config)
reader, err := NewFileSystemScanner(config, nil)
if err != nil {
t.Fatal(err)
}
Expand All @@ -62,7 +62,7 @@ func TestScanner(t *testing.T) {
c := config
c.Recursive = true

reader, err := NewFileSystemScanner(c)
reader, err := NewFileSystemScanner(c, nil)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -94,7 +94,7 @@ func TestScanner(t *testing.T) {
c := config
c.ScanRateBytesPerSec = 1024 * 5

reader, err := NewFileSystemScanner(c)
reader, err := NewFileSystemScanner(c, nil)
if err != nil {
t.Fatal(err)
}
Expand Down