Skip to content

Commit

Permalink
[Auditbeat] Socket: Add network.transport and network.community_id (#…
Browse files Browse the repository at this point in the history
…12231)

Adds `network.transport` (always `tcp` at the moment) and `network.community_id` to the `socket` dataset.
  • Loading branch information
Christoph Wurm authored May 23, 2019
1 parent 57c54a9 commit 874e01f
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Package: Enable suse. {pull}11634[11634]
- Add support to the system package dataset for the SUSE OS family. {pull}11634[11634]
- Process: Add file hash of process executable. {pull}11722[11722]
- Socket: Add network.transport and network.community_id. {pull}12231[12231]

*Filebeat*

Expand Down
2 changes: 2 additions & 0 deletions x-pack/auditbeat/module/system/socket/_meta/data.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
},
"message": "Inbound socket (10.0.2.2:55270 -\u003e 10.0.2.15:22) CLOSED by process sshd (PID: 22799) and user root (UID: 0)",
"network": {
"community_id": "1:IXrg9Y06W7zrkqBlE30jpC/mzjo=",
"direction": "inbound",
"transport": "tcp",
"type": "ipv4"
},
"process": {
Expand Down
52 changes: 51 additions & 1 deletion x-pack/auditbeat/module/system/socket/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/common/flowhash"
"github.com/elastic/beats/libbeat/logp"
sock "github.com/elastic/beats/metricbeat/helper/socket"
"github.com/elastic/beats/metricbeat/mb"
Expand All @@ -39,6 +40,22 @@ const (
eventTypeEvent = "event"
)

type ipProtocol uint8

const (
// TODO: Unify IP protocol constants in Beats
tcp ipProtocol = 6
)

func (proto ipProtocol) String() string {
switch proto {
case tcp:
return "tcp"
default:
return ""
}
}

type eventAction uint8

const (
Expand Down Expand Up @@ -85,6 +102,7 @@ type MetricSet struct {
// Socket represents information about a socket.
type Socket struct {
Family linux.AddressFamily
Protocol ipProtocol
LocalIP net.IP
LocalPort int
RemoteIP net.IP
Expand All @@ -102,6 +120,7 @@ type Socket struct {
func newSocket(diag *linux.InetDiagMsg) *Socket {
return &Socket{
Family: linux.AddressFamily(diag.Family),
Protocol: tcp,
LocalIP: diag.SrcIP(),
LocalPort: diag.SrcPort(),
RemoteIP: diag.DstIP(),
Expand All @@ -126,14 +145,20 @@ func (s Socket) Hash() uint64 {
func (s Socket) toMapStr() common.MapStr {
mapstr := common.MapStr{
"network": common.MapStr{
"type": s.Family.String(),
"direction": s.Direction.String(),
"transport": s.Protocol.String(),
"type": s.Family.String(),
},
"user": common.MapStr{
"id": s.UID,
},
}

communityID := s.communityID()
if communityID != "" {
mapstr.Put("network.community_id", communityID)
}

if s.Username != "" {
mapstr.Put("user.name", s.Username)
}
Expand Down Expand Up @@ -190,6 +215,31 @@ func (s Socket) entityID(hostID string) string {
return h.Sum()
}

// communityID calculates the community ID of this socket.
func (s Socket) communityID() string {
var flow flowhash.Flow

switch s.Direction {
case sock.Inbound:
flow.SourceIP = s.RemoteIP
flow.SourcePort = uint16(s.RemotePort)
flow.DestinationIP = s.LocalIP
flow.DestinationPort = uint16(s.LocalPort)
case sock.Outbound:
flow.SourceIP = s.LocalIP
flow.SourcePort = uint16(s.LocalPort)
flow.DestinationIP = s.RemoteIP
flow.DestinationPort = uint16(s.RemotePort)
default:
// Listening socket, not a flow
return ""
}

flow.Protocol = uint8(s.Protocol)

return flowhash.CommunityID.Hash(flow)
}

// New constructs a new MetricSet.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
cfgwarn.Beta("The %v/%v dataset is beta", moduleName, metricsetName)
Expand Down
17 changes: 17 additions & 0 deletions x-pack/auditbeat/module/system/socket/socket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,18 @@ func TestData(t *testing.T) {
mbtest.WriteEventToDataJSON(t, fullEvent, "")
}

func TestSocket(t *testing.T) {
s := testSocket()

assert.Equal(t, uint64(0xee1186910755e9b1), s.Hash())
assert.Equal(t, "fIj66YRoGyoe8dML", s.entityID("fa8a1edd06864f47ba4cad5d0f5ca134"))
assert.Equal(t, "1:IXrg9Y06W7zrkqBlE30jpC/mzjo=", s.communityID())
}

func testSocket() *Socket {
return &Socket{
Family: linux.AF_INET,
Protocol: tcp,
LocalIP: net.IPv4(10, 0, 2, 15),
LocalPort: 22,
RemoteIP: net.IPv4(10, 0, 2, 2),
Expand Down Expand Up @@ -107,7 +116,13 @@ func TestOutbound(t *testing.T) {
checkFieldValue(t, event.RootFields, "process.name", "socket.test")
checkFieldValue(t, event.RootFields, "user.id", os.Geteuid())
checkFieldValue(t, event.RootFields, "network.direction", sock.Outbound.String())
checkFieldValue(t, event.RootFields, "network.transport", "tcp")
checkFieldValue(t, event.RootFields, "destination.port", 80)

communityID, err := event.RootFields.GetValue("network.community_id")
if assert.NoError(t, err) {
assert.NotEmpty(t, communityID)
}
}

func TestListening(t *testing.T) {
Expand Down Expand Up @@ -153,6 +168,7 @@ func TestListening(t *testing.T) {
checkFieldValue(t, event.RootFields, "process.name", "socket.test")
checkFieldValue(t, event.RootFields, "user.id", os.Geteuid())
checkFieldValue(t, event.RootFields, "network.direction", sock.Listening.String())
checkFieldValue(t, event.RootFields, "network.transport", "tcp")
}

func TestLocalhost(t *testing.T) {
Expand Down Expand Up @@ -203,6 +219,7 @@ func TestLocalhost(t *testing.T) {
checkFieldValue(t, event.RootFields, "process.name", "socket.test")
checkFieldValue(t, event.RootFields, "user.id", os.Geteuid())
checkFieldValue(t, event.RootFields, "network.direction", sock.Listening.String())
checkFieldValue(t, event.RootFields, "network.transport", "tcp")
checkFieldValue(t, event.RootFields, "destination.ip", "127.0.0.1")
}

Expand Down
2 changes: 1 addition & 1 deletion x-pack/auditbeat/tests/system/test_metricsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def test_metricset_socket(self):
socket metricset collects information about open sockets on a system.
"""

fields = ["socket.entity_id", "destination.port"]
fields = ["socket.entity_id", "destination.port", "network.direction", "network.transport"]

# errors_allowed=True - The socket metricset fills the `error` field if the process enrichment fails
# (e.g. process has exited). This should not fail the test.
Expand Down

0 comments on commit 874e01f

Please sign in to comment.