From 6e9d39237e538234f09b13aa1b62e9dbc897a332 Mon Sep 17 00:00:00 2001 From: Christoph Wurm Date: Tue, 5 Feb 2019 14:05:58 +0000 Subject: [PATCH] [Auditbeat] System module: Add entity_id fields (#10500) Implements `{entity}.entity_id` as a SHA-256 hash as proposed in https://github.com/elastic/beats/issues/10463. Closes https://github.com/elastic/beats/issues/10463. (cherry picked from commit c047ef7ca532e07fde66531a03ec96b2b1e0673e) --- CHANGELOG.next.asciidoc | 1 + auditbeat/docs/fields.asciidoc | 43 +++++++++++++++++++ .../auditbeat/module/system/_meta/fields.yml | 23 ++++++++++ x-pack/auditbeat/module/system/entity_hash.go | 26 +++++++++++ x-pack/auditbeat/module/system/fields.go | 2 +- .../module/system/package/_meta/data.json | 3 +- .../module/system/package/_meta/fields.yml | 5 +++ .../module/system/package/package.go | 34 ++++++++++----- .../module/system/process/_meta/data.json | 1 + .../module/system/process/process.go | 37 ++++++++++------ .../module/system/process/process_test.go | 4 +- .../module/system/socket/_meta/data.json | 5 ++- .../auditbeat/module/system/socket/socket.go | 40 +++++++++++------ x-pack/auditbeat/module/system/system.go | 34 ++++++++++++++- .../module/system/user/_meta/data.json | 1 + x-pack/auditbeat/module/system/user/user.go | 43 ++++++++++++------- .../auditbeat/tests/system/test_metricsets.py | 13 +++--- 17 files changed, 253 insertions(+), 62 deletions(-) create mode 100644 x-pack/auditbeat/module/system/entity_hash.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 6ed163179cb..1559c57236e 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -152,6 +152,7 @@ https://github.com/elastic/beats/compare/v6.6.0...6.x[Check the HEAD diff] - System module `process` dataset: Add user information to processes. {pull}9963[9963] - Add system `package` dataset. {pull}10225[10225] - Add system module `login` dataset. {pull}9327[9327] +- Add `entity_id` fields. {pull}10500[10500] *Filebeat* diff --git a/auditbeat/docs/fields.asciidoc b/auditbeat/docs/fields.asciidoc index 0a5ae9aa781..93a16db253d 100644 --- a/auditbeat/docs/fields.asciidoc +++ b/auditbeat/docs/fields.asciidoc @@ -3749,6 +3749,16 @@ If the event describes an action, this fields contains the outcome of that actio -- +*`user.entity_id`*:: ++ +-- +type: keyword + +ID uniquely identifying the user on a host. It is computed as a SHA-256 hash of the host ID, user ID, and user name. + + +-- + *`user.terminal`*:: + -- @@ -3757,6 +3767,28 @@ type: keyword Terminal of the user. +-- + + +*`process.entity_id`*:: ++ +-- +type: keyword + +ID uniquely identifying the process. It is computed as a SHA-256 hash of the host ID, PID, and process start time. + + +-- + + +*`socket.entity_id`*:: ++ +-- +type: keyword + +ID uniquely identifying the socket. It is computed as a SHA-256 hash of the host ID, socket inode, local IP, local port, remote IP, and remote port. + + -- [float] @@ -3936,6 +3968,17 @@ The operating system's kernel version. +*`system.audit.package.entity_id`*:: ++ +-- +type: keyword + +ID uniquely identifying the package. It is computed as a SHA-256 hash of the + host ID, package name, and package version. + + +-- + *`system.audit.package.name`*:: + -- diff --git a/x-pack/auditbeat/module/system/_meta/fields.yml b/x-pack/auditbeat/module/system/_meta/fields.yml index b360cf75e82..2ede4c4d684 100644 --- a/x-pack/auditbeat/module/system/_meta/fields.yml +++ b/x-pack/auditbeat/module/system/_meta/fields.yml @@ -28,11 +28,34 @@ - name: user type: group fields: + - name: entity_id + type: keyword + description: > + ID uniquely identifying the user on a host. It is computed as a SHA-256 hash + of the host ID, user ID, and user name. - name: terminal type: keyword description: > Terminal of the user. + - name: process + type: group + fields: + - name: entity_id + type: keyword + description: > + ID uniquely identifying the process. It is computed as a SHA-256 hash of the + host ID, PID, and process start time. + + - name: socket + type: group + fields: + - name: entity_id + type: keyword + description: > + ID uniquely identifying the socket. It is computed as a SHA-256 hash of the + host ID, socket inode, local IP, local port, remote IP, and remote port. + - name: system.audit type: group description: > diff --git a/x-pack/auditbeat/module/system/entity_hash.go b/x-pack/auditbeat/module/system/entity_hash.go new file mode 100644 index 00000000000..ce968a7b461 --- /dev/null +++ b/x-pack/auditbeat/module/system/entity_hash.go @@ -0,0 +1,26 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package system + +import ( + "crypto/sha256" + "encoding/hex" + "hash" +) + +// EntityHash calculates a standard entity hash. +type EntityHash struct { + hash.Hash +} + +// NewEntityHash creates a new EntityHash. +func NewEntityHash() EntityHash { + return EntityHash{sha256.New()} +} + +// Sum returns the hash as a string. +func (h *EntityHash) Sum() string { + return hex.EncodeToString(h.Hash.Sum(nil)) +} diff --git a/x-pack/auditbeat/module/system/fields.go b/x-pack/auditbeat/module/system/fields.go index 7ade7792246..2a063fea81e 100644 --- a/x-pack/auditbeat/module/system/fields.go +++ b/x-pack/auditbeat/module/system/fields.go @@ -18,5 +18,5 @@ func init() { // Asset returns asset data func Asset() string { - return "eJy0Wd9v2zgSfvdfMehLE8BVcMChOPjhgLZX3AZot8Emxe5bPBbHEjcUKXCoOO5fvyBFyXJM2XGjCggQMeT3fZxfHDHv4IG2C+AtO6pmAE46RQt4cxsG3swABHFuZe2k0Qv47wwA4K4kJkBL4EqCtSQlGArSZNGRgNU2jLeYUBnRKMpmAJYUIdMC6KkmKyvSDtUMIsBiNgN4BxorP+ORtAtcblvTAgprmjq8d5P9791sY2UhdRjqFjzQdmOsiGOJPfjnW1gHZh30Bs4M7krJkKOGFQHCWiqCGl0JF5QVGSyvHtFeKVP4n+xfy8t5j2ZsgPGSOshogtxUtdGkHbgSHXBT10qSCFMEOuywNTkl9cPyMtvfXuNyU9H5+7srqVu8v8lZP+V6MBxhVsSAGjD3cHNw3h7Rx7nRDqXmsGaH3KOF7bULM/j8hFWtiLuJHCJmyU2eE/MSUAtYrlGqxtIygz/RaqmLBVzrHm/duMYSPJJlaTT7TXz+dDuHDUGtUIMzUFvzKIV3lZLs/AzMc6odrhTBI6qGeAcXXNRtZw51iEdomGAjXQk5NkF6Zx5qd7CAqHkYoQ2TfXGAOrKV1CHYz3VhXNn5z9NmQx1tiGXYCJlOmATwUOJQZmnY9YOHSEeF+mfpAZa7KGlLggq4IPXa2ApbA+/WjBWF3mfPtA48UDvZZ8VQsTK62Bs+Itk/3wMQSA0atWHKjRacJRhXxrgRToGOzuH8aIwDj5XiiQYkK3+QSJCtjFGE+hy+W3Ig19ETDLjjSAnwwn4YTZl/TQjYD90XCfh9UBY7+O7dq5pDqIEfb++OCjLrNZPLmPIJHH+30+FRfQQc8b5XOZ09fotoKSaZcvpPcsD1/1IUaPNSOsp9fZ2QbAgbT7Wn/7y/f//vy5SIClNe/Anurx8+AQphiZmSvpN1gujZ4AmO65vjFIYTFM+L5wmWpeFB+RxUTMCVaVx77Na+zZK66Er/HsZhudwprBU6j/iMdNzqJ23y7bYHjd7OSTvDc2hWjXbNHDZSC7Phyyyp6CCdXqsmNF+tkq+Y+5G/RqjXWEm1nZS8hYz0lkSJbg6CVhL1HNaWaMXilEViqzOproiZJnwgq0lNx3eXCNG3HGkOpfSxifkDFvSq7iNiHM0g1CA1O1SKhO/YLVXmkUTHP01nMtUpcdOKgrFzIhUsr6RKxErHFu0xJVuEHDugpqQankwpPiVz0tPuLkImz6M2BidpJju6iDnaVbL88fpuuSPzYEmSpqrQbn8CsF2YwmysmtIt3//4clh/+s+5IcU5xccDnDy7/SQGP5Q4vM+vN7/qIAX4zmQPys7AWvI54uvZ9vvUHVcxLdf/vU9HyYS0U2/sLUNpKvLQlDuzH+KDxClJTXgMA9xYU1iswBmwjQZ0oEwhR7oAH5j3g5hNCnH05M42d7wC8AR7VwDwTcMXqZuneMMk20ulgnIT72dGwuGgo+7kmdXflJ8pcBngTnQM25aU+xT2Ymu04bbpYkVbo0X/t7cMtZW+lLWrnvV56TSG46kMJ2LhRZ6APvgP8xqO5tuOXmpHBT1PkTPpx3KvRubE5sY+pE77tgM87t7ea3E2XGjTVut+RDomtT7bk175r/LkhwPZHjaDG8Msd1ef8cq1RGE29709RjAv9jZdIpc+0KVur7IDRrgPv5zvbHsvJONKkVjOR1CX2uyYPUeb7AJ1QdY0DMg+x4ymcOuuTAFSX84Bdco4ATG329oNQTcl6X2XBd947Vfk8qswLICJKh4BdaaLEv+NQDpwhA+DFvHA+4PWEdnd56Xf0HjqHPR07fMiZ9+F/xNs92pMt9ENchAAUUA2+ycAAP//fzXZ0w==" + return "eJzEWV1v2zgWffevuOhLE8BVsIvdYuGHBfqFqYF2GkxSzLzF1+K1xAnFqyGpOO6vH5CiZNmW7DhRMQICWAx5zrlf1BX1Bu5pMwO7sY6KCYCTTtEMXt2EgVcTAEE2NbJ0kvUM/j8BALjNyRKgIXA5wUqSEhYy0mTQkYDlJozXmFCwqBQlEwBDitDSDOixJCML0g7VBCLAbDIBeAMaCz/jgbQLXG5T0gwyw1UZ7pvJ/nczm43MpA5DzYJ72qzZiDjWY4O/voV1wKugN3AmcJtLCylqWBIgrKQiKNHlcEFJlsDi6gHNleLM/yX/WlxOWzQ2AcZLaiCjC1IuStakHbgcHdiqLJUkEaYIdNhga3JK6vvFZbJrXuVSLuh8+25zahbvGjlpp8w7wxFmSRZQA6YebgrO+yPGOGXtUGob1myRW7RgXr0wgU+PWJSKbDPRhoxZ2CpNydoFoBawWKFUlaFFAr+j0VJnM5jrFm9VucoQPJCxkrX1Rnz6cDOFNUGpUINjKA0/SOFDpaR1fgamKZUOl4rgAVVFdgsXQtSYM4Uy5CNUlmAtXQ4pVkF64x6qLZhB1NzN0MqSeXKCknbSbe6kOD+G849QaflXRWoDUnig1UbqLETAawDWgJCzdQnMHfjc5aKsfB2iBYSbz+/e/Pu/byFHm29TtY66XwXzj9MayP/wMQk3XvduFjoyhdShYM9Nw7iyofUESdeXpeHg3n/YnVHGaT9GQ1rQ1o/XjQ8jFFiHxoGT3pkdgy2n9/T0/e0n2VureIG5NQBIzYKmoDhFBfPr5lfJxk3BUMGOwrB3TLz1/9v1SNgpE6yE7PdLj31dV3Xd5fW1g4dIR/3lr4UHWGw3u/rJpmq7pV6xKbDeJ7Zrhp5t7dazp7WzkZQ+PXYE1IoV62xn+Ihkf30PQCA1aNRsKWUtbNLDuGR2A5wCHZ3D+Z65Se9DnuhAMvIHiR6yJbMi1Ofw3fh0W8VI+DxtOfoEeGE/WFPib3sE7FbQkwT82nm6N/Dd7XQK4VH+/ub2qCBerSy5xFI6QuBvtzo8qs+AI9H3Ksfzx+eI1sck+4L+TA6Yf+yjQJPm0lHq24QRybqwsTl7/N/bu7f/uewTUWBfFJ/B/fXdB0AhDFlLvbGTZQ/R3uAJjvn1cQq2PRT7m+cJlgXbzvbZ2TEBl1y5unss/duCfxTFrX8H43C77HQLCp1H3CMd9vpJn3y7aUFjtFPSju0UqmWlXTWFtdSC1/Yy6VV0UE4vVRPeIWolXzH1I38MUK+wkGozKnkNGekNiRzdFAQtJeoprAzR0opTHokd+6i6ImY/4T0ZTWo8vtueFH1tI82hlDY3Mb3HjF7UfUSMoxWEGqS2DpUi4V88fVv1QKLhH6cz2W87Tzn1qEuPNt5R9dmdaHO1HWlECgbETjyO9GTPYPk+08TrDnkfT19RvJDqiFUx7mOyRcihB/GYVN0ncB+fkinpca2LkL3P3brWRmmaG7qIOdg9W/nj5W8FDZkH6yWpigLN5hmA9cI+zMqoMcPy/bcvh/tse/rSpThnk/UAJ3sUP8nWByyHTcr5++rPahgAvu8e2Rx4S+4jvpxttx/fcmXjcv3iYzpIJqQZ27DXFnIuyENT6ng3xTuFk5Masd0AuDacGSzAMZhKAzpQnMmBbscn5l0nZ3uFOHp0Z7s7HnWEM8DuUQd80/BF6uoxHgjL+gw4o5TjcepAOhy8OTTyePknpWcKXAS4E53Rpia12yNSaaFEEw6HL5a0YS3a/722UBrpt7J61V4/21/GcLyU4UQuPCkS0Cb/YV3D0Xrb0kvtKKP9EjmTfqj2SrS2x7ihF8bTsW0Aj4e3jVqcDReaXewi44h0ltTq7Eh65T8rku8OZHvYBK7ZWrn9UhG/kOQoeH3X+mMA82LH6NAe+8LU9dluwAifry6nW9/eCWlxqUgspgOoC81bZs9RF7tAnZHhyoamXG9YU/hIpjgDqS9Drz2EmJpN6bqg65z0bshCbLz2K3LpVRgWYIkKOwDquMkS/y5EOnCEF6Aa8SD6ndYRrbtLc2/QcOkc9HT19aRg34bPepudPaYxdI02CIAoIJn8HQAA//9sRPQr" } diff --git a/x-pack/auditbeat/module/system/package/_meta/data.json b/x-pack/auditbeat/module/system/package/_meta/data.json index ca570150245..5071146db3b 100644 --- a/x-pack/auditbeat/module/system/package/_meta/data.json +++ b/x-pack/auditbeat/module/system/package/_meta/data.json @@ -7,7 +7,7 @@ "event": { "action": "existing_package", "dataset": "package", - "id": "9ac4ea4c-5a0c-475f-b4c9-ec9d981ff11b", + "id": "ed069c3f-1d30-4e17-845b-cc915cf108b4", "kind": "state", "module": "system" }, @@ -18,6 +18,7 @@ "system": { "audit": { "package": { + "entity_id": "c2c455d9f99375d9fefc61da52fa93778976d54b1964164778058980531d77dc", "installtime": "2018-08-30T18:41:23.85657356+01:00", "name": "zstd", "summary": "Zstandard is a real-time compression algorithm", diff --git a/x-pack/auditbeat/module/system/package/_meta/fields.yml b/x-pack/auditbeat/module/system/package/_meta/fields.yml index d87ba9aac06..67c894089fa 100644 --- a/x-pack/auditbeat/module/system/package/_meta/fields.yml +++ b/x-pack/auditbeat/module/system/package/_meta/fields.yml @@ -4,6 +4,11 @@ `package` contains information about an installed or removed package. release: experimental fields: + - name: entity_id + type: keyword + description: > + ID uniquely identifying the package. It is computed as a SHA-256 hash of the + host ID, package name, and package version. - name: name type: keyword description: > diff --git a/x-pack/auditbeat/module/system/package/package.go b/x-pack/auditbeat/module/system/package/package.go index f25dbc2fbc4..a4eca344453 100644 --- a/x-pack/auditbeat/module/system/package/package.go +++ b/x-pack/auditbeat/module/system/package/package.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/x-pack/auditbeat/cache" + "github.com/elastic/beats/x-pack/auditbeat/module/system" "github.com/elastic/go-sysinfo" "github.com/elastic/go-sysinfo/types" ) @@ -85,7 +86,7 @@ func init() { // MetricSet collects data about the system's packages. type MetricSet struct { - mb.BaseMetricSet + system.SystemMetricSet config config log *logp.Logger cache *cache.Cache @@ -155,6 +156,15 @@ func (pkg Package) toMapStr() common.MapStr { return mapstr } +// entityID creates an ID that uniquely identifies this package across machines. +func (pkg Package) entityID(hostID string) string { + h := system.NewEntityHash() + h.Write([]byte(hostID)) + h.Write([]byte(pkg.Name)) + h.Write([]byte(pkg.Version)) + return h.Sum() +} + func getOS() (*types.OSInfo, error) { host, err := sysinfo.Host() if err != nil { @@ -184,11 +194,11 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { } ms := &MetricSet{ - BaseMetricSet: base, - config: config, - log: logp.NewLogger(metricsetName), - cache: cache.New(), - bucket: bucket, + SystemMetricSet: system.NewSystemMetricSet(base), + config: config, + log: logp.NewLogger(metricsetName), + cache: cache.New(), + bucket: bucket, } osInfo, err := getOS() @@ -282,7 +292,7 @@ func (ms *MetricSet) reportState(report mb.ReporterV2) error { return errors.Wrap(err, "error generating state ID") } for _, pkg := range packages { - event := packageEvent(pkg, eventTypeState, eventActionExistingPackage) + event := ms.packageEvent(pkg, eventTypeState, eventActionExistingPackage) event.RootFields.Put("event.id", stateID.String()) report.Event(event) } @@ -327,19 +337,19 @@ func (ms *MetricSet) reportChanges(report mb.ReporterV2) error { if missingPkg.Name == newPkg.Name { found = true updated[newPkg.Name] = struct{}{} - report.Event(packageEvent(newPkg, eventTypeEvent, eventActionPackageUpdated)) + report.Event(ms.packageEvent(newPkg, eventTypeEvent, eventActionPackageUpdated)) break } } if !found { - report.Event(packageEvent(missingPkg, eventTypeEvent, eventActionPackageRemoved)) + report.Event(ms.packageEvent(missingPkg, eventTypeEvent, eventActionPackageRemoved)) } } for _, newPkg := range newPackages { if _, contains := updated[newPkg.Name]; !contains { - report.Event(packageEvent(newPkg, eventTypeEvent, eventActionPackageInstalled)) + report.Event(ms.packageEvent(newPkg, eventTypeEvent, eventActionPackageInstalled)) } } @@ -360,7 +370,7 @@ func convertToPackage(cacheValues []interface{}) []*Package { return packages } -func packageEvent(pkg *Package, eventType string, action eventAction) mb.Event { +func (ms *MetricSet) packageEvent(pkg *Package, eventType string, action eventAction) mb.Event { event := mb.Event{ RootFields: common.MapStr{ "event": common.MapStr{ @@ -372,6 +382,8 @@ func packageEvent(pkg *Package, eventType string, action eventAction) mb.Event { MetricSetFields: pkg.toMapStr(), } + event.MetricSetFields.Put("entity_id", pkg.entityID(ms.HostID())) + if pkg.Error != nil { event.RootFields.Put("error.message", pkg.Error.Error()) } diff --git a/x-pack/auditbeat/module/system/process/_meta/data.json b/x-pack/auditbeat/module/system/process/_meta/data.json index c649f77710d..dc7d3b23d5a 100644 --- a/x-pack/auditbeat/module/system/process/_meta/data.json +++ b/x-pack/auditbeat/module/system/process/_meta/data.json @@ -15,6 +15,7 @@ "args": [ "zsh" ], + "entity_id": "e2e0c5f51b093b71afed6af23debc906090d2f2f2afa9b930bf7e0803a6b53d5", "executable": "/bin/zsh", "name": "zsh", "pid": 12936, diff --git a/x-pack/auditbeat/module/system/process/process.go b/x-pack/auditbeat/module/system/process/process.go index 7effb37728e..408f5d289fc 100644 --- a/x-pack/auditbeat/module/system/process/process.go +++ b/x-pack/auditbeat/module/system/process/process.go @@ -5,6 +5,7 @@ package process import ( + "encoding/binary" "fmt" "os" "os/user" @@ -21,6 +22,7 @@ import ( "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/x-pack/auditbeat/cache" + "github.com/elastic/beats/x-pack/auditbeat/module/system" "github.com/elastic/go-sysinfo" "github.com/elastic/go-sysinfo/types" ) @@ -71,7 +73,7 @@ func init() { // MetricSet collects data about the host. type MetricSet struct { - mb.BaseMetricSet + system.SystemMetricSet config Config cache *cache.Cache log *logp.Logger @@ -111,6 +113,15 @@ func (p Process) toMapStr() common.MapStr { } } +// entityID creates an ID that uniquely identifies this process across machines. +func (p Process) entityID(hostID string) string { + h := system.NewEntityHash() + h.Write([]byte(hostID)) + binary.Write(h, binary.LittleEndian, int64(p.Info.PID)) + binary.Write(h, binary.LittleEndian, int64(p.Info.StartTime.Nanosecond())) + return h.Sum() +} + // New constructs a new MetricSet. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { cfgwarn.Experimental("The %v/%v dataset is experimental", moduleName, metricsetName) @@ -126,11 +137,11 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { } ms := &MetricSet{ - BaseMetricSet: base, - config: config, - log: logp.NewLogger(metricsetName), - cache: cache.New(), - bucket: bucket, + SystemMetricSet: system.NewSystemMetricSet(base), + config: config, + log: logp.NewLogger(metricsetName), + cache: cache.New(), + bucket: bucket, } // Load from disk: Time when state was last sent @@ -204,12 +215,12 @@ func (ms *MetricSet) reportState(report mb.ReporterV2) error { } for _, p := range processes { if p.Error == nil { - event := processEvent(p, eventTypeState, eventActionExistingProcess) + event := ms.processEvent(p, eventTypeState, eventActionExistingProcess) event.RootFields.Put("event.id", stateID.String()) report.Event(event) } else { ms.log.Warn(p.Error) - report.Event(processEvent(p, eventTypeError, eventActionProcessError)) + report.Event(ms.processEvent(p, eventTypeError, eventActionProcessError)) } } @@ -245,10 +256,10 @@ func (ms *MetricSet) reportChanges(report mb.ReporterV2) error { p := cacheValue.(*Process) if p.Error == nil { - report.Event(processEvent(p, eventTypeEvent, eventActionProcessStarted)) + report.Event(ms.processEvent(p, eventTypeEvent, eventActionProcessStarted)) } else { ms.log.Warn(p.Error) - report.Event(processEvent(p, eventTypeError, eventActionProcessError)) + report.Event(ms.processEvent(p, eventTypeError, eventActionProcessError)) } } @@ -256,14 +267,14 @@ func (ms *MetricSet) reportChanges(report mb.ReporterV2) error { p := cacheValue.(*Process) if p.Error == nil { - report.Event(processEvent(p, eventTypeEvent, eventActionProcessStopped)) + report.Event(ms.processEvent(p, eventTypeEvent, eventActionProcessStopped)) } } return nil } -func processEvent(process *Process, eventType string, action eventAction) mb.Event { +func (ms *MetricSet) processEvent(process *Process, eventType string, action eventAction) mb.Event { event := mb.Event{ RootFields: common.MapStr{ "event": common.MapStr{ @@ -302,6 +313,8 @@ func processEvent(process *Process, eventType string, action eventAction) mb.Eve event.RootFields.Put("error.message", process.Error.Error()) } + event.RootFields.Put("process.entity_id", process.entityID(ms.HostID())) + return event } diff --git a/x-pack/auditbeat/module/system/process/process_test.go b/x-pack/auditbeat/module/system/process/process_test.go index f9bba759a52..d86120e5ac7 100644 --- a/x-pack/auditbeat/module/system/process/process_test.go +++ b/x-pack/auditbeat/module/system/process/process_test.go @@ -42,6 +42,8 @@ func getConfig() map[string]interface{} { } func TestProcessEvent(t *testing.T) { + ms := mbtest.NewReportingMetricSetV2(t, getConfig()).(*MetricSet) + process := Process{ Info: types.ProcessInfo{ Name: "zsh", @@ -72,7 +74,7 @@ func TestProcessEvent(t *testing.T) { eventType := eventTypeEvent eventAction := eventActionProcessStarted - event := processEvent(&process, eventType, eventAction) + event := ms.processEvent(&process, eventType, eventAction) containsError, err := event.RootFields.HasKey("error") if assert.NoError(t, err) { diff --git a/x-pack/auditbeat/module/system/socket/_meta/data.json b/x-pack/auditbeat/module/system/socket/_meta/data.json index a373c3cf179..6ae45ea0863 100644 --- a/x-pack/auditbeat/module/system/socket/_meta/data.json +++ b/x-pack/auditbeat/module/system/socket/_meta/data.json @@ -27,6 +27,9 @@ "service": { "type": "system" }, + "socket": { + "entity_id": "d85bf25935c0ebbabc053024d4954ddd78979bd1390364ac46395c390ed7a6df" + }, "source": { "ip": "10.0.2.2", "port": 55270 @@ -35,4 +38,4 @@ "id": 0, "name": "root" } -} \ No newline at end of file +} diff --git a/x-pack/auditbeat/module/system/socket/socket.go b/x-pack/auditbeat/module/system/socket/socket.go index b69148e4529..f5cadeb9c79 100644 --- a/x-pack/auditbeat/module/system/socket/socket.go +++ b/x-pack/auditbeat/module/system/socket/socket.go @@ -7,6 +7,7 @@ package socket import ( + "encoding/binary" "fmt" "net" "os/user" @@ -25,6 +26,7 @@ import ( sock "github.com/elastic/beats/metricbeat/helper/socket" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/x-pack/auditbeat/cache" + "github.com/elastic/beats/x-pack/auditbeat/module/system" "github.com/elastic/gosigar/sys/linux" ) @@ -67,7 +69,7 @@ func init() { // MetricSet collects data about sockets. type MetricSet struct { - mb.BaseMetricSet + system.SystemMetricSet config Config cache *cache.Cache log *logp.Logger @@ -176,6 +178,18 @@ func (s Socket) toMapStr() common.MapStr { return mapstr } +// entityID creates an ID that uniquely identifies this socket across machines. +func (s Socket) entityID(hostID string) string { + h := system.NewEntityHash() + h.Write([]byte(hostID)) + binary.Write(h, binary.LittleEndian, int64(s.Inode)) + h.Write(s.LocalIP) + h.Write(s.RemoteIP) + binary.Write(h, binary.LittleEndian, int64(s.LocalPort)) + binary.Write(h, binary.LittleEndian, int64(s.RemotePort)) + return h.Sum() +} + // ecsDirectionString is a custom alternative to the existing String() // to be compatible with recommended ECS values for network.direction. func ecsDirectionString(direction sock.Direction) string { @@ -206,13 +220,13 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { } ms := &MetricSet{ - BaseMetricSet: base, - config: config, - log: logp.NewLogger(metricsetName), - cache: cache.New(), - netlink: sock.NewNetlinkSession(), - ptable: ptable, - listeners: sock.NewListenerTable(), + SystemMetricSet: system.NewSystemMetricSet(base), + config: config, + log: logp.NewLogger(metricsetName), + cache: cache.New(), + netlink: sock.NewNetlinkSession(), + ptable: ptable, + listeners: sock.NewListenerTable(), } return ms, nil @@ -262,7 +276,7 @@ func (ms *MetricSet) reportState(report mb.ReporterV2) error { return err } - event := socketEvent(socket, eventTypeState, eventActionExistingSocket) + event := ms.socketEvent(socket, eventTypeState, eventActionExistingSocket) event.RootFields.Put("event.id", stateID.String()) report.Event(event) } @@ -294,18 +308,18 @@ func (ms *MetricSet) reportChanges(report mb.ReporterV2) error { return err } - report.Event(socketEvent(s.(*Socket), eventTypeEvent, eventActionSocketOpened)) + report.Event(ms.socketEvent(s.(*Socket), eventTypeEvent, eventActionSocketOpened)) } } for _, s := range closed { - report.Event(socketEvent(s.(*Socket), eventTypeEvent, eventActionSocketClosed)) + report.Event(ms.socketEvent(s.(*Socket), eventTypeEvent, eventActionSocketClosed)) } return nil } -func socketEvent(socket *Socket, eventType string, action eventAction) mb.Event { +func (ms *MetricSet) socketEvent(socket *Socket, eventType string, action eventAction) mb.Event { event := mb.Event{ RootFields: socket.toMapStr(), } @@ -314,6 +328,8 @@ func socketEvent(socket *Socket, eventType string, action eventAction) mb.Event event.RootFields.Put("event.action", action.String()) event.RootFields.Put("message", socketMessage(socket, action)) + event.RootFields.Put("socket.entity_id", socket.entityID(ms.HostID())) + return event } diff --git a/x-pack/auditbeat/module/system/system.go b/x-pack/auditbeat/module/system/system.go index 572ab654de5..af567056004 100644 --- a/x-pack/auditbeat/module/system/system.go +++ b/x-pack/auditbeat/module/system/system.go @@ -5,7 +5,10 @@ package system import ( + "github.com/pkg/errors" + "github.com/elastic/beats/metricbeat/mb" + "github.com/elastic/go-sysinfo" ) func init() { @@ -28,6 +31,7 @@ type SystemModuleConfig struct { type SystemModule struct { mb.BaseModule config SystemModuleConfig + hostID string } // Config returns the ModuleConfig used to create the Module. @@ -45,5 +49,33 @@ func NewModule(base mb.BaseModule) (mb.Module, error) { return nil, err } - return &SystemModule{BaseModule: base, config: config}, nil + hostInfo, err := sysinfo.Host() + if err != nil { + return nil, errors.Wrap(err, "failed to get host ID") + } + + return &SystemModule{ + BaseModule: base, + config: config, + hostID: hostInfo.Info().UniqueID, + }, nil +} + +// SystemMetricSet extends the Metricbeat BaseMetricSet. +type SystemMetricSet struct { + mb.BaseMetricSet + module *SystemModule +} + +// NewSystemMetricSet creates a new SystemMetricSet. +func NewSystemMetricSet(base mb.BaseMetricSet) SystemMetricSet { + return SystemMetricSet{ + BaseMetricSet: base, + module: base.Module().(*SystemModule), + } +} + +// HostID returns the stored host ID. +func (ms *SystemMetricSet) HostID() string { + return ms.module.hostID } diff --git a/x-pack/auditbeat/module/system/user/_meta/data.json b/x-pack/auditbeat/module/system/user/_meta/data.json index e885e34c718..65a49cd5073 100644 --- a/x-pack/auditbeat/module/system/user/_meta/data.json +++ b/x-pack/auditbeat/module/system/user/_meta/data.json @@ -42,6 +42,7 @@ } }, "user": { + "entity_id": "4a80efe9ab38d1bb28aaa207d03ef24a702602d0ae55cc4661946fa1d8eee6b5", "id": "1002", "name": "elastic" } diff --git a/x-pack/auditbeat/module/system/user/user.go b/x-pack/auditbeat/module/system/user/user.go index 4cd76721028..8c3f79fd210 100644 --- a/x-pack/auditbeat/module/system/user/user.go +++ b/x-pack/auditbeat/module/system/user/user.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/x-pack/auditbeat/cache" + "github.com/elastic/beats/x-pack/auditbeat/module/system" ) const ( @@ -170,6 +171,15 @@ func (user User) toMapStr() common.MapStr { return evt } +// entityID creates an ID that uniquely identifies this user across machines. +func (u User) entityID(hostID string) string { + h := system.NewEntityHash() + h.Write([]byte(hostID)) + h.Write([]byte(u.Name)) + h.Write([]byte(u.UID)) + return h.Sum() +} + func init() { mb.Registry.MustAddMetricSet(moduleName, metricsetName, New, mb.DefaultMetricSet(), @@ -179,7 +189,7 @@ func init() { // MetricSet collects data about a system's users. type MetricSet struct { - mb.BaseMetricSet + system.SystemMetricSet config config log *logp.Logger cache *cache.Cache @@ -207,11 +217,11 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { } ms := &MetricSet{ - BaseMetricSet: base, - config: config, - log: logp.NewLogger(metricsetName), - cache: cache.New(), - bucket: bucket, + SystemMetricSet: system.NewSystemMetricSet(base), + config: config, + log: logp.NewLogger(metricsetName), + cache: cache.New(), + bucket: bucket, } if ms.config.DetectPasswordChanges { @@ -291,7 +301,7 @@ func (ms *MetricSet) reportState(report mb.ReporterV2) error { return errors.Wrap(err, "error generating state ID") } for _, user := range users { - event := userEvent(user, eventTypeState, eventActionExistingUser) + event := ms.userEvent(user, eventTypeState, eventActionExistingUser) event.RootFields.Put("event.id", stateID.String()) report.Event(event) } @@ -360,7 +370,7 @@ func (ms *MetricSet) reportChanges(report mb.ReporterV2) error { newUser.PasswordType != oldUser.PasswordType if passwordChanged { - report.Event(userEvent(newUser, eventTypeEvent, eventActionPasswordChanged)) + report.Event(ms.userEvent(newUser, eventTypeEvent, eventActionPasswordChanged)) } } @@ -369,26 +379,26 @@ func (ms *MetricSet) reportChanges(report mb.ReporterV2) error { oldUser.PasswordHashHash = newUser.PasswordHashHash oldUser.PasswordType = newUser.PasswordType if newUser.Hash() != oldUser.Hash() { - report.Event(userEvent(newUser, eventTypeEvent, eventActionUserChanged)) + report.Event(ms.userEvent(newUser, eventTypeEvent, eventActionUserChanged)) } delete(missingUserMap, oldUser.UID) } else { - report.Event(userEvent(newUser, eventTypeEvent, eventActionUserAdded)) + report.Event(ms.userEvent(newUser, eventTypeEvent, eventActionUserAdded)) } } for _, missingUser := range missingUserMap { - report.Event(userEvent(missingUser, eventTypeEvent, eventActionUserRemoved)) + report.Event(ms.userEvent(missingUser, eventTypeEvent, eventActionUserRemoved)) } } else { // No changes to users for _, user := range newInCache { - report.Event(userEvent(user.(*User), eventTypeEvent, eventActionUserAdded)) + report.Event(ms.userEvent(user.(*User), eventTypeEvent, eventActionUserAdded)) } for _, user := range missingFromCache { - report.Event(userEvent(user.(*User), eventTypeEvent, eventActionUserRemoved)) + report.Event(ms.userEvent(user.(*User), eventTypeEvent, eventActionUserRemoved)) } } @@ -399,7 +409,7 @@ func (ms *MetricSet) reportChanges(report mb.ReporterV2) error { return nil } -func userEvent(user *User, eventType string, action eventAction) mb.Event { +func (ms *MetricSet) userEvent(user *User, eventType string, action eventAction) mb.Event { return mb.Event{ RootFields: common.MapStr{ "event": common.MapStr{ @@ -407,8 +417,9 @@ func userEvent(user *User, eventType string, action eventAction) mb.Event { "action": action.String(), }, "user": common.MapStr{ - "id": user.UID, - "name": user.Name, + "entity_id": user.entityID(ms.HostID()), + "id": user.UID, + "name": user.Name, }, "message": userMessage(user, action), }, diff --git a/x-pack/auditbeat/tests/system/test_metricsets.py b/x-pack/auditbeat/tests/system/test_metricsets.py index f44535ead3e..02858d84c30 100644 --- a/x-pack/auditbeat/tests/system/test_metricsets.py +++ b/x-pack/auditbeat/tests/system/test_metricsets.py @@ -17,7 +17,8 @@ def test_metricset_host(self): host metricset collects general information about a server. """ - fields = ["system.audit.host.uptime", "system.audit.host.ip", "system.audit.host.os.name"] + fields = ["system.audit.host.id", "system.audit.host.uptime", "system.audit.host.ip", + "system.audit.host.os.name"] # Metricset is experimental and that generates a warning, TODO: remove later self.check_metricset("system", "host", COMMON_FIELDS + fields, warnings_allowed=True) @@ -48,7 +49,7 @@ def test_metricset_package(self): package metricset collects information about installed packages on a system. """ - fields = ["system.audit.package.name", "system.audit.package.version"] + fields = ["system.audit.package.entity_id", "system.audit.package.name", "system.audit.package.version"] # Metricset is experimental and that generates a warning, TODO: remove later self.check_metricset("system", "package", COMMON_FIELDS + fields, warnings_allowed=True) @@ -58,8 +59,8 @@ def test_metricset_process(self): process metricset collects information about processes running on a system. """ - fields = ["process.pid", "process.ppid", "process.name", "process.executable", "process.args", - "process.start", "process.working_directory", "user.id", "user.group.id"] + fields = ["process.entity_id", "process.pid", "process.ppid", "process.name", "process.executable", + "process.args", "process.start", "process.working_directory", "user.id", "user.group.id"] # Windows does not have effective and saved IDs, and user.name is not always filled for system processes. if sys.platform != "win32": @@ -75,7 +76,7 @@ def test_metricset_socket(self): socket metricset collects information about open sockets on a system. """ - fields = ["destination.port"] + fields = ["socket.entity_id", "destination.port"] # errors_allowed=True - The socket metricset fills the `error` field if the process enrichment fails # (e.g. process has exited). This should not fail the test. @@ -88,7 +89,7 @@ def test_metricset_user(self): user metricset collects information about users on a server. """ - fields = ["system.audit.user.name"] + fields = ["user.entity_id", "system.audit.user.name"] # Metricset is experimental and that generates a warning, TODO: remove later self.check_metricset("system", "user", COMMON_FIELDS + fields, warnings_allowed=True)