From ce41d1a29d44e24115b589a51c6b32dafd257259 Mon Sep 17 00:00:00 2001 From: Tigran Mkrtchyan Date: Tue, 22 Mar 2016 17:01:38 +0100 Subject: [PATCH] packetbeat: add support for NFS v3 and v4 protocols Signed-off-by: Tigran Mkrtchyan --- packetbeat/docs/fields.asciidoc | 97 +++++++ packetbeat/etc/beat.yml | 5 + packetbeat/etc/fields.yml | 69 +++++ packetbeat/etc/sample_outputs/nfs.json | 41 +++ packetbeat/main.go | 1 + packetbeat/packetbeat.yml | 5 + packetbeat/protos/nfs/README.md | 53 ++++ packetbeat/protos/nfs/config.go | 18 ++ packetbeat/protos/nfs/nfs.go | 42 +++ packetbeat/protos/nfs/nfs3.go | 34 +++ packetbeat/protos/nfs/nfs4.go | 241 ++++++++++++++++++ packetbeat/protos/nfs/nfs_status.go | 112 ++++++++ packetbeat/protos/nfs/rpc.go | 238 +++++++++++++++++ packetbeat/protos/nfs/rpc_message.go | 119 +++++++++ packetbeat/protos/nfs/xdr.go | 59 +++++ packetbeat/protos/nfs/xdr_test.go | 26 ++ .../tests/system/config/packetbeat.yml.j2 | 3 + packetbeat/tests/system/pcaps/nfs_v3.pcap | Bin 0 -> 1308 bytes packetbeat/tests/system/pcaps/nfs_v4.pcap | Bin 0 -> 1420 bytes packetbeat/tests/system/test_0061_nfs.py | 58 +++++ 20 files changed, 1221 insertions(+) create mode 100644 packetbeat/etc/sample_outputs/nfs.json create mode 100644 packetbeat/protos/nfs/README.md create mode 100644 packetbeat/protos/nfs/config.go create mode 100644 packetbeat/protos/nfs/nfs.go create mode 100644 packetbeat/protos/nfs/nfs3.go create mode 100644 packetbeat/protos/nfs/nfs4.go create mode 100644 packetbeat/protos/nfs/nfs_status.go create mode 100644 packetbeat/protos/nfs/rpc.go create mode 100644 packetbeat/protos/nfs/rpc_message.go create mode 100644 packetbeat/protos/nfs/xdr.go create mode 100644 packetbeat/protos/nfs/xdr_test.go create mode 100644 packetbeat/tests/system/pcaps/nfs_v3.pcap create mode 100644 packetbeat/tests/system/pcaps/nfs_v4.pcap create mode 100644 packetbeat/tests/system/test_0061_nfs.py diff --git a/packetbeat/docs/fields.asciidoc b/packetbeat/docs/fields.asciidoc index 572a7db7a0a..49b04061ece 100644 --- a/packetbeat/docs/fields.asciidoc +++ b/packetbeat/docs/fields.asciidoc @@ -24,6 +24,7 @@ grouped in the following categories: * <> * <> * <> +* <> [[exported-fields-flows_event]] === Flow Event Fields @@ -1453,6 +1454,102 @@ A BSON document that specifies the update to be performed. For information on sp The cursor identifier returned in the OP_REPLY. This must be the value that was returned from the database. +=== rpc Fields + +OncRPC specific event fields. + + +==== rpc.xid + +RPC message transaction identifier. + +==== rpc.call_size + +type: number + +RPC call size with argument. + +==== rpc.reply_size + +type: number + +RPC reply size with argument. + +==== rpc.status + +RPC message reply status. + +==== rpc.time + +type: number + +RPC message processing time. + +==== rpc.time_str + +RPC message processing time in human readable form. + +==== rpc.auth_flavor + +RPC authentication flavor. + +==== rpc.cred.uid + +type: number + +RPC caller's user id, in case of auth-unix. + +==== rpc.cred.gid + +type: number + +RPC caller's group id, in case of auth-unix. + +==== rpc.cred.gids + +RPC caller's secondary group ids, in case of auth-unix. + +==== rpc.cred.stamp + +type: number + +Arbitrary ID which the caller machine may generate. + +==== rpc.cred.machinename + +The name of the caller's machine. + +[[exported-fields-nfs]] +=== NFS Fields + +NFS v4/3 specific event fields. + + +==== nfs.version + +type: number + +NFS protocol version number. + +==== nfs.minor_version + +type: number + +NFS protocol minor version number. + +==== nfs.tag + +NFS v4 COMPOUND operation tag. + +==== nfs.opcode + +NFS operation name, or main operation name, in case of COMPOUND calls. + + +==== nfs.status + +NFS operation reply status. + [[exported-fields-trans_measurements]] === Measurements (Transactions) Fields diff --git a/packetbeat/etc/beat.yml b/packetbeat/etc/beat.yml index aa4aa1c1ca1..4b34ff4abdf 100644 --- a/packetbeat/etc/beat.yml +++ b/packetbeat/etc/beat.yml @@ -146,6 +146,11 @@ protocols: # the MongoDB protocol by commenting out the list of ports. ports: [27017] + nfs: + # Configure the ports where to listen for NFS traffic. You can disable + # the NFS protocol by commenting out the list of ports. + ports: [2049] + ############################# Processes ####################################### # Configure the processes to be monitored and how to find them. If a process is diff --git a/packetbeat/etc/fields.yml b/packetbeat/etc/fields.yml index 59464372d33..ac89651432c 100644 --- a/packetbeat/etc/fields.yml +++ b/packetbeat/etc/fields.yml @@ -1288,6 +1288,74 @@ trans_event: description: > The cursor identifier returned in the OP_REPLY. This must be the value that was returned from the database. + - name: rpc + type: group + description: OncRPC specific event fields. + fields: + - name: xid + description: RPC message transaction identifier. + + - name: call_size + type: number + description: RPC call size with argument. + + - name: reply_size + type: number + description: RPC reply size with argument. + + - name: status + description: RPC message reply status. + + - name: time + type: number + description: RPC message processing time. + + - name: time_str + description: RPC message processing time in human readable form. + + - name: auth_flavor + description: RPC authentication flavor. + + - name: cred.uid + type: number + description: RPC caller's user id, in case of auth-unix. + + - name: cred.gid + type: number + description: RPC caller's group id, in case of auth-unix. + + - name: cred.gids + description: RPC caller's secondary group ids, in case of auth-unix. + + - name: cred.stamp + type: number + description: Arbitrary ID which the caller machine may generate. + + - name: cred.machinename + description: The name of the caller's machine. + + - name: nfs + type: group + description: NFS v4/3 specific event fields. + fields: + - name: version + type: number + description: NFS protocol version number. + + - name: minor_version + type: number + description: NFS protocol minor version number. + + - name: tag + description: NFS v4 COMPOUND operation tag. + + - name: opcode + description: > + NFS operation name, or main operation name, in case of COMPOUND + calls. + + - name: status + description: NFS operation reply status. raw: type: group description: These fields contain the raw transaction data. @@ -1383,3 +1451,4 @@ sections: - ["trans_env", "Environmental (Transactions)"] - ["flows_env", "Environmental (Flows)"] - ["raw", "Raw"] + - ["nfs", "NFS"] diff --git a/packetbeat/etc/sample_outputs/nfs.json b/packetbeat/etc/sample_outputs/nfs.json new file mode 100644 index 00000000000..0da67352f47 --- /dev/null +++ b/packetbeat/etc/sample_outputs/nfs.json @@ -0,0 +1,41 @@ +{ + "@timestamp": "2016-03-28T06:18:18.431Z", + "beat": { + "hostname": "localhost", + "name": "localhost" + }, + "count": 1, + "dst": "127.0.0.1", + "dst_port": 2049, + "nfs": { + "minor_version": 1, + "opcode": "GETATTR", + "status": "NFSERR_NOENT", + "tag": "", + "version": 4 + }, + "rpc": { + "auth_flavor": "unix", + "call_size": 200, + "cred": { + "gid": 500, + "gids": [ + 491, + 499, + 500 + ], + "machinename": "localhost", + "stamp": 4597002, + "uid": 500 + }, + "reply_size": 96, + "status": "success", + "time": 25631000, + "time_str": "25.631ms", + "xid": "2cf0c876" + }, + "src": "127.0.0.1", + "src_port": 975, + "type": "nfs" +} + diff --git a/packetbeat/main.go b/packetbeat/main.go index 29282f99b7f..f10cf1de270 100644 --- a/packetbeat/main.go +++ b/packetbeat/main.go @@ -13,6 +13,7 @@ import ( _ "github.com/elastic/beats/packetbeat/protos/memcache" _ "github.com/elastic/beats/packetbeat/protos/mongodb" _ "github.com/elastic/beats/packetbeat/protos/mysql" + _ "github.com/elastic/beats/packetbeat/protos/nfs" _ "github.com/elastic/beats/packetbeat/protos/pgsql" _ "github.com/elastic/beats/packetbeat/protos/redis" _ "github.com/elastic/beats/packetbeat/protos/thrift" diff --git a/packetbeat/packetbeat.yml b/packetbeat/packetbeat.yml index 4af84f84754..be8b5421163 100644 --- a/packetbeat/packetbeat.yml +++ b/packetbeat/packetbeat.yml @@ -146,6 +146,11 @@ protocols: # the MongoDB protocol by commenting out the list of ports. ports: [27017] + nfs: + # Configure the ports where to listen for NFS traffic. You can disable + # the NFS protocol by commenting out the list of ports. + ports: [2049] + ############################# Processes ####################################### # Configure the processes to be monitored and how to find them. If a process is diff --git a/packetbeat/protos/nfs/README.md b/packetbeat/protos/nfs/README.md new file mode 100644 index 00000000000..2f47bae0a28 --- /dev/null +++ b/packetbeat/protos/nfs/README.md @@ -0,0 +1,53 @@ +NFS packetbeat +============== + +NFS v3 and v4 protocols parsing for packetbeat. + +Can be extended to handle other SunRPC based protocols as well. + +Sample output: +-------------- +```json +{ + "@timestamp": "2016-03-28T06:18:18.431Z", + "beat": { + "hostname": "localhost", + "name": "localhost" + }, + "count": 1, + "dst": "127.0.0.1", + "dst_port": 2049, + "nfs": { + "minor_version": 1, + "opcode": "GETATTR", + "status": "NFSERR_NOENT", + "tag": "", + "version": 4 + }, + "rpc": { + "auth_flavor": "unix", + "call_size": 200, + "cred": { + "gid": 500, + "gids": [ + 491, + 499, + 500 + ], + "machinename": "localhost", + "stamp": 4597002, + "uid": 500 + }, + "reply_size": 96, + "status": "success", + "time": 25631000, + "time_str": "25.631ms", + "xid": "2cf0c876" + }, + "src": "127.0.0.1", + "src_port": 975, + "type": "nfs" +} +``` + + diff --git a/packetbeat/protos/nfs/config.go b/packetbeat/protos/nfs/config.go new file mode 100644 index 00000000000..07d33edc355 --- /dev/null +++ b/packetbeat/protos/nfs/config.go @@ -0,0 +1,18 @@ +package nfs + +import ( + "github.com/elastic/beats/packetbeat/config" + "github.com/elastic/beats/packetbeat/protos" +) + +type rpcConfig struct { + config.ProtocolCommon `config:",inline"` +} + +var ( + defaultConfig = rpcConfig{ + ProtocolCommon: config.ProtocolCommon{ + TransactionTimeout: protos.DefaultTransactionExpiration, + }, + } +) diff --git a/packetbeat/protos/nfs/nfs.go b/packetbeat/protos/nfs/nfs.go new file mode 100644 index 00000000000..8b0411f87fb --- /dev/null +++ b/packetbeat/protos/nfs/nfs.go @@ -0,0 +1,42 @@ +package nfs + +import ( + "github.com/elastic/beats/libbeat/common" + "time" +) + +type Nfs struct { + xdr Xdr + vers uint32 + proc uint32 + event common.MapStr + ts time.Time +} + +func (nfs *Nfs) getRequestInfo() { + + nfsInfo := common.MapStr{} + nfsInfo["version"] = nfs.vers + + switch nfs.vers { + case 3: + nfsInfo["opcode"] = nfs.getV3Opcode() + case 4: + switch nfs.proc { + case 0: + nfsInfo["opcode"] = "NULL" + case 1: + tag := nfs.xdr.getDynamicOpaque() + nfsInfo["tag"] = string(tag) + nfsInfo["minor_version"] = nfs.xdr.getUInt() + nfsInfo["opcode"] = nfs.getV4Opcode() + } + } + nfs.event["nfs"] = nfsInfo +} + +func (nfs *Nfs) getReplyInfo(xdr *Xdr) { + nfsInfo := nfs.event["nfs"].(common.MapStr) + stat := int(xdr.getUInt()) + nfsInfo["status"] = NFS_STATUS[stat] +} diff --git a/packetbeat/protos/nfs/nfs3.go b/packetbeat/protos/nfs/nfs3.go new file mode 100644 index 00000000000..8dc971a6ae6 --- /dev/null +++ b/packetbeat/protos/nfs/nfs3.go @@ -0,0 +1,34 @@ +package nfs + +var nfs_opnum3 = [...]string{ + "NULL", + "GETATTR", + "SETATTR", + "LOOKUP", + "ACCESS", + "READLINK", + "READ", + "WRITE", + "CREATE", + "MKDIR", + "SYM_LINK", + "MKNODE", + "REMOVE", + "RMDIR", + "RENAME", + "LINK", + "READDIR", + "READDIRPLUS", + "FSSTAT", + "FSINFO", + "PATHINFO", + "COMMIT", +} + +func (nfs *Nfs) getV3Opcode() string { + if int(nfs.proc) < len(nfs_opnum3) { + return nfs_opnum3[nfs.proc] + } else { + return "ILLEGAL" + } +} diff --git a/packetbeat/protos/nfs/nfs4.go b/packetbeat/protos/nfs/nfs4.go new file mode 100644 index 00000000000..96579bf7416 --- /dev/null +++ b/packetbeat/protos/nfs/nfs4.go @@ -0,0 +1,241 @@ +package nfs + +import "fmt" + +const ( + OP_ACCESS = 3 + OP_CLOSE = 4 + OP_COMMIT = 5 + OP_CREATE = 6 + OP_DELEGPURGE = 7 + OP_DELEGRETURN = 8 + OP_GETATTR = 9 + OP_GETFH = 10 + OP_LINK = 11 + OP_LOCK = 12 + OP_LOCKT = 13 + OP_LOCKU = 14 + OP_LOOKUP = 15 + OP_LOOKUPP = 16 + OP_NVERIFY = 17 + OP_OPEN = 18 + OP_OPENATTR = 19 + OP_OPEN_CONFIRM = 20 + OP_OPEN_DOWNGRADE = 21 + OP_PUTFH = 22 + OP_PUTPUBFH = 23 + OP_PUTROOTFH = 24 + OP_READ = 25 + OP_READDIR = 26 + OP_READLINK = 27 + OP_REMOVE = 28 + OP_RENAME = 29 + OP_RENEW = 30 + OP_RESTOREFH = 31 + OP_SAVEFH = 32 + OP_SECINFO = 33 + OP_SETATTR = 34 + OP_SETCLIENTID = 35 + OP_SETCLIENTID_CONFIRM = 36 + OP_VERIFY = 37 + OP_WRITE = 38 + OP_RELEASE_LOCKOWNER = 39 + OP_BACKCHANNEL_CTL = 40 + OP_BIND_CONN_TO_SESSION = 41 + OP_EXCHANGE_ID = 42 + OP_CREATE_SESSION = 43 + OP_DESTROY_SESSION = 44 + OP_FREE_STATEID = 45 + OP_GET_DIR_DELEGATION = 46 + OP_GETDEVICEINFO = 47 + OP_GETDEVICELIST = 48 + OP_LAYOUTCOMMIT = 49 + OP_LAYOUTGET = 50 + OP_LAYOUTRETURN = 51 + OP_SECINFO_NO_NAME = 52 + OP_SEQUENCE = 53 + OP_SET_SSV = 54 + OP_TEST_STATEID = 55 + OP_WANT_DELEGATION = 56 + OP_DESTROY_CLIENTID = 57 + OP_RECLAIM_COMPLETE = 58 + OP_ILLEGAL = 10044 +) + +var nfs_opnum4 = map[int]string{ + 3: "ACCESS", + 4: "CLOSE", + 5: "COMMIT", + 6: "CREATE", + 7: "DELEGPURGE", + 8: "DELEGRETURN", + 9: "GETATTR", + 10: "GETFH", + 11: "LINK", + 12: "LOCK", + 13: "LOCKT", + 14: "LOCKU", + 15: "LOOKUP", + 16: "LOOKUPP", + 17: "NVERIFY", + 18: "OPEN", + 19: "OPENATTR", + 20: "OPEN_CONFIRM", + 21: "OPEN_DOWNGRADE", + 22: "PUTFH", + 23: "PUTPUBFH", + 24: "PUTROOTFH", + 25: "READ", + 26: "READDIR", + 27: "READLINK", + 28: "REMOVE", + 29: "RENAME", + 30: "RENEW", + 31: "RESTOREFH", + 32: "SAVEFH", + 33: "SECINFO", + 34: "SETATTR", + 35: "SETCLIENTID", + 36: "SETCLIENTID_CONFIRM", + 37: "VERIFY", + 38: "WRITE", + 39: "RELEASE_LOCKOWNER", + 40: "BACKCHANNEL_CTL", + 41: "BIND_CONN_TO_SESSION", + 42: "EXCHANGE_ID", + 43: "CREATE_SESSION", + 44: "DESTROY_SESSION", + 45: "FREE_STATEID", + 46: "GET_DIR_DELEGATION", + 47: "GETDEVICEINFO", + 48: "GETDEVICELIST", + 49: "LAYOUTCOMMIT", + 50: "LAYOUTGET", + 51: "LAYOUTRETURN", + 52: "SECINFO_NO_NAME", + 53: "SEQUENCE", + 54: "SET_SSV", + 55: "TEST_STATEID", + 56: "WANT_DELEGATION", + 57: "DESTROY_CLIENTID", + 58: "RECLAIM_COMPLETE", + 10044: "ILLEGAL", +} + +func (nfs *Nfs) eatData(op int) { + + switch op { + case OP_GETATTR: + nfs.xdr.getUIntVector() + case OP_GETFH: + // nothing to eat + case OP_LOOKUP: + nfs.xdr.getDynamicOpaque() + case OP_LOOKUPP: + // nothing to eat + case OP_NVERIFY: + nfs.xdr.getUIntVector() + nfs.xdr.getDynamicOpaque() + case OP_PUTFH: + nfs.xdr.getDynamicOpaque() + case OP_PUTPUBFH: + // nothing to eat + case OP_PUTROOTFH: + // nothing to eat + case OP_READLINK: + // nothing to eat + case OP_RENEW: + nfs.xdr.getUHyper() + case OP_RESTOREFH: + // nothing to eat + case OP_SAVEFH: + // nothing to eat + case OP_SECINFO: + nfs.xdr.getDynamicOpaque() + case OP_VERIFY: + nfs.xdr.getUIntVector() + nfs.xdr.getDynamicOpaque() + case OP_SEQUENCE: + nfs.xdr.getOpaque(16) + nfs.xdr.getUInt() + nfs.xdr.getUInt() + nfs.xdr.getUInt() + nfs.xdr.getUInt() + + } +} + +func (nfs *Nfs) getV4Opcode() string { + + // default op code + current_opname := "ILLEGAL" + + opcount := int(nfs.xdr.getUInt()) + for i := 0; i < opcount; i++ { + op := int(nfs.xdr.getUInt()) + opname, ok := nfs_opnum4[op] + + if !ok { + return fmt.Sprintf("ILLEGAL (%d)", op) + } + + current_opname = opname + + switch op { + // First class ops + // + // The first class ops usually the main operation in the compound. + // NFS spec allowes to build compound opertion where multiple + // first class ops are used, like OPEN->LOCK->WRITE->LOCKU->CLOSE, + // but such construnction are not used in the practice. + case + OP_ACCESS, + OP_BACKCHANNEL_CTL, + OP_BIND_CONN_TO_SESSION, + OP_CLOSE, + OP_COMMIT, + OP_CREATE, + OP_CREATE_SESSION, + OP_DELEGPURGE, + OP_DELEGRETURN, + OP_DESTROY_CLIENTID, + OP_DESTROY_SESSION, + OP_EXCHANGE_ID, + OP_FREE_STATEID, + OP_GETDEVICEINFO, + OP_GETDEVICELIST, + OP_GET_DIR_DELEGATION, + OP_LAYOUTCOMMIT, + OP_LAYOUTGET, + OP_LAYOUTRETURN, + OP_LINK, + OP_LOCK, + OP_LOCKT, + OP_LOCKU, + OP_OPEN, + OP_OPENATTR, + OP_OPEN_CONFIRM, + OP_OPEN_DOWNGRADE, + OP_READ, + OP_READDIR, + OP_READLINK, + OP_RECLAIM_COMPLETE, + OP_RELEASE_LOCKOWNER, + OP_REMOVE, + OP_RENAME, + OP_SECINFO_NO_NAME, + OP_SETATTR, + OP_SETCLIENTID, + OP_SETCLIENTID_CONFIRM, + OP_SET_SSV, + OP_TEST_STATEID, + OP_WANT_DELEGATION, + OP_WRITE: + + break + default: + nfs.eatData(op) + } + } + return current_opname +} diff --git a/packetbeat/protos/nfs/nfs_status.go b/packetbeat/protos/nfs/nfs_status.go new file mode 100644 index 00000000000..08746d84666 --- /dev/null +++ b/packetbeat/protos/nfs/nfs_status.go @@ -0,0 +1,112 @@ +package nfs + +var NFS_STATUS = map[int]string{ + 0: "NFS_OK", + 1: "NFSERR_PERM", + 2: "NFSERR_NOENT", + 5: "NFSERR_IO", + 6: "NFSERR_NXIO", + 13: "NFSERR_ACCESS", + 17: "NFSERR_EXIST", + 18: "NFSERR_XDEV", + 19: "NFSERR_NODEV", + 20: "NFSERR_NOTDIR", + 21: "NFSERR_ISDIR", + 22: "NFSERR_INVAL", + 27: "NFSERR_FBIG", + 28: "NFSERR_NOSPC", + 30: "NFSERR_ROFS", + 31: "NFSERR_MLINK", + 63: "NFSERR_NAMETOOLONG", + 66: "NFSERR_NOTEMPTY", + 69: "NFSERR_DQUOT", + 70: "NFSERR_STALE", + 71: "NFSERR_REMOTE", + 99: "NFSERR_WFLUSH", + 10001: "NFSERR_BADHANDLE", + 10002: "NFSERR_NOT_SYNC", + 10003: "NFSERR_BAD_COOKIE", + 10004: "NFSERR_NOTSUPP", + 10005: "NFSERR_TOOSMALL", + 10006: "NFSERR_SERVERFAULT", + 10007: "NFSERR_BADTYPE", + 10008: "NFSERR_DELAY", + 10009: "NFSERR_SAME", + 10010: "NFSERR_DENIED", + 10011: "NFSERR_EXPIRED", + 10012: "NFSERR_LOCKED", + 10013: "NFSERR_GRACE", + 10014: "NFSERR_FHEXPIRED", + 10015: "NFSERR_SHARE_DENIED", + 10016: "NFSERR_WRONGSEC", + 10017: "NFSERR_CLID_INUSE", + 10018: "NFSERR_RESOURCE", + 10019: "NFSERR_MOVED", + 10020: "NFSERR_NOFILEHANDLE", + 10021: "NFSERR_MINOR_VERS_MISMATCH", + 10022: "NFSERR_STALE_CLIENTID", + 10023: "NFSERR_STALE_STATEID", + 10024: "NFSERR_OLD_STATEID", + 10025: "NFSERR_BAD_STATEID", + 10026: "NFSERR_BAD_SEQID", + 10027: "NFSERR_NOT_SAME", + 10028: "NFSERR_LOCK_RANGE", + 10029: "NFSERR_SYMLINK", + 10030: "NFSERR_RESTOREFH", + 10031: "NFSERR_LEASE_MOVED", + 10032: "NFSERR_ATTRNOTSUPP", + 10033: "NFSERR_NO_GRACE", + 10034: "NFSERR_RECLAIM_BAD", + 10035: "NFSERR_RECLAIM_CONFLICT", + 10036: "NFSERR_BADXDR", + 10037: "NFSERR_LOCKS_HELD", + 10038: "NFSERR_OPENMODE", + 10039: "NFSERR_BADOWNER", + 10040: "NFSERR_BADCHAR", + 10041: "NFSERR_BADNAME", + 10042: "NFSERR_BAD_RANGE", + 10043: "NFSERR_LOCK_NOTSUPP", + 10044: "NFSERR_OP_ILLEGAL", + 10045: "NFSERR_DEADLOCK", + 10046: "NFSERR_FILE_OPEN", + 10047: "NFSERR_ADMIN_REVOKED", + 10048: "NFSERR_CB_PATH_DOWN", + 10049: "NFSERR_BADIOMODE", + 10050: "NFSERR_BADLAYOUT", + 10051: "NFSERR_BAD_SESSION_DIGEST", + 10052: "NFSERR_BADSESSION", + 10053: "NFSERR_BADSLOT", + 10054: "NFSERR_COMPLETE_ALREADY", + 10055: "NFSERR_CONN_NOT_BOUND_TO_SESSION", + 10056: "NFSERR_DELEG_ALREADY_WANTED", + 10057: "NFSERR_BACK_CHAN_BUSY", + 10058: "NFSERR_LAYOUTTRYLATER", + 10059: "NFSERR_LAYOUTUNAVAILABLE", + 10060: "NFSERR_NOMATCHING_LAYOUT", + 10061: "NFSERR_RECALLCONFLICT", + 10062: "NFSERR_UNKNOWN_LAYOUTTYPE", + 10063: "NFSERR_SEQ_MISORDERED", + 10064: "NFSERR_SEQUENCE_POS", + 10065: "NFSERR_REQ_TOO_BIG", + 10066: "NFSERR_REP_TOO_BIG", + 10067: "NFSERR_REP_TOO_BIG_TO_CACHE", + 10068: "NFSERR_RETRY_UNCACHED_REP", + 10069: "NFSERR_UNSAFE_COMPOUND", + 10070: "NFSERR_TOO_MANY_OPS", + 10071: "NFSERR_OP_NOT_IN_SESSION", + 10072: "NFSERR_HASH_ALG_UNSUPP", + 10073: "NFSERR_CONN_BINDING_NOT_ENFORCED", + 10074: "NFSERR_CLIENTID_BUSY", + 10075: "NFSERR_PNFS_IO_HOLE", + 10076: "NFSERR_SEQ_FALSE_RETRY", + 10077: "NFSERR_BAD_HIGH_SLOT", + 10078: "NFSERR_DEADSESSION", + 10079: "NFSERR_ENCR_ALG_UNSUPP", + 10080: "NFSERR_PNFS_NO_LAYOUT", + 10081: "NFSERR_NOT_ONLY_OP", + 10082: "NFSERR_WRONG_CRED", + 10083: "NFSERR_WRONG_TYPE", + 10084: "NFSERR_DIRDELEG_UNAVAIL", + 10085: "NFSERR_REJECT_DELEG", + 10086: "NFSERR_RETURNCONFLICT", +} diff --git a/packetbeat/protos/nfs/rpc.go b/packetbeat/protos/nfs/rpc.go new file mode 100644 index 00000000000..a229ff975cd --- /dev/null +++ b/packetbeat/protos/nfs/rpc.go @@ -0,0 +1,238 @@ +// Package rpc provides support for parsing RPC messages and reporting the +// results. This package supports the RPC v2 protocol as defined by RFC 5531 +// (RFC 1831). + +package nfs + +import ( + "encoding/binary" + "time" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + + "github.com/elastic/beats/packetbeat/protos" + "github.com/elastic/beats/packetbeat/protos/tcp" + "github.com/elastic/beats/packetbeat/publish" +) + +var debugf = logp.MakeDebug("rpc") + +const ( + RPC_LAST_FRAG = 0x80000000 + RPC_SIZE_MASK = 0x7fffffff +) + +type RpcStream struct { + tcpTuple *common.TcpTuple + rawData []byte +} + +type rpcConnectionData struct { + Streams [2]*RpcStream +} + +type Rpc struct { + // Configuration data. + Ports []int + + transactionTimeout time.Duration + + results publish.Transactions // Channel where results are pushed. +} + +func init() { + protos.Register("nfs", New) +} + +func New( + testMode bool, + results publish.Transactions, + cfg *common.Config, +) (protos.Plugin, error) { + p := &Rpc{} + config := defaultConfig + if !testMode { + if err := cfg.Unpack(&config); err != nil { + logp.Warn("failed to read config") + return nil, err + } + } + + if err := p.init(results, &config); err != nil { + logp.Warn("failed to init") + return nil, err + } + return p, nil +} + +func (rpc *Rpc) init(results publish.Transactions, config *rpcConfig) error { + rpc.setFromConfig(config) + rpc.results = results + + return nil +} + +func (rpc *Rpc) setFromConfig(config *rpcConfig) error { + rpc.Ports = config.Ports + rpc.transactionTimeout = time.Duration(config.TransactionTimeout) * time.Second + return nil +} + +func (rpc *Rpc) GetPorts() []int { + return rpc.Ports +} + +// Called when TCP payload data is available for parsing. +func (rpc *Rpc) Parse( + pkt *protos.Packet, + tcptuple *common.TcpTuple, + dir uint8, + private protos.ProtocolData, +) protos.ProtocolData { + + defer logp.Recover("ParseRPC exception") + + conn := ensureRpcConnection(private) + + conn = rpc.handleRpcFragment(conn, pkt, tcptuple, dir) + if conn == nil { + return nil + } + return conn +} + +// Called when the FIN flag is seen in the TCP stream. +func (rpc *Rpc) ReceivedFin(tcptuple *common.TcpTuple, dir uint8, + private protos.ProtocolData) protos.ProtocolData { + + defer logp.Recover("ReceivedFinRpc exception") + + // forced by TCP interface + + // TODO + return private +} + +// Called when a packets are missing from the tcp +// stream. +func (rpc *Rpc) GapInStream(tcptuple *common.TcpTuple, dir uint8, + nbytes int, private protos.ProtocolData) (priv protos.ProtocolData, drop bool) { + + defer logp.Recover("GapInRpcStream exception") + + // forced by TCP interface + + // TODO + return private, false +} + +// ConnectionTimeout returns the per stream connection timeout. +// Return <=0 to set default tcp module transaction timeout. +func (rpc *Rpc) ConnectionTimeout() time.Duration { + // forced by TCP interface + return rpc.transactionTimeout +} + +func ensureRpcConnection(private protos.ProtocolData) *rpcConnectionData { + conn := getRpcConnection(private) + if conn == nil { + conn = &rpcConnectionData{} + } + return conn +} + +func getRpcConnection(private protos.ProtocolData) *rpcConnectionData { + if private == nil { + return nil + } + + priv, ok := private.(*rpcConnectionData) + if !ok { + logp.Warn("rpc connection data type error") + return nil + } + if priv == nil { + logp.Warn("Unexpected: rpc connection data not set") + return nil + } + + return priv +} + +// Parse function is used to process TCP payloads. +func (rpc *Rpc) handleRpcFragment( + conn *rpcConnectionData, + pkt *protos.Packet, + tcptuple *common.TcpTuple, + dir uint8, +) *rpcConnectionData { + + st := conn.Streams[dir] + if st == nil { + st = newStream(pkt, tcptuple) + conn.Streams[dir] = st + } else { + // concatenate bytes + st.rawData = append(st.rawData, pkt.Payload...) + if len(st.rawData) > tcp.TCP_MAX_DATA_IN_STREAM { + debugf("Stream data too large, dropping TCP stream") + conn.Streams[dir] = nil + return conn + } + } + + for len(st.rawData) > 0 { + + if len(st.rawData) < 4 { + debugf("Wainting for more data") + break + } + + marker := uint32(binary.BigEndian.Uint32(st.rawData[0:4])) + size := int(marker & RPC_SIZE_MASK) + islast := (marker & RPC_LAST_FRAG) != 0 + + if len(st.rawData)-4 < size { + debugf("Wainting for more data") + break + } + + if !islast { + logp.Warn("multifragment rpc message") + break + } + + xdr := Xdr{data: st.rawData[4 : 4+size], offset: 0} + msg := &RpcMessage{ts: pkt.Ts, xdr: xdr} + + src := common.Endpoint{ + Ip: tcptuple.Src_ip.String(), + Port: tcptuple.Src_port, + } + dst := common.Endpoint{ + Ip: tcptuple.Dst_ip.String(), + Port: tcptuple.Dst_port, + } + + event := common.MapStr{} + event["@timestamp"] = common.Time(pkt.Ts) + event["status"] = common.OK_STATUS // all packes are OK for now + event["src"] = &src + event["dst"] = &dst + + msg.fillEvent(event, rpc.results, size) + + st.rawData = st.rawData[4+size:] + } + + return conn +} + +func newStream(pkt *protos.Packet, tcptuple *common.TcpTuple) *RpcStream { + return &RpcStream{ + tcpTuple: tcptuple, + } +} + +// diff --git a/packetbeat/protos/nfs/rpc_message.go b/packetbeat/protos/nfs/rpc_message.go new file mode 100644 index 00000000000..09c4ed0e20e --- /dev/null +++ b/packetbeat/protos/nfs/rpc_message.go @@ -0,0 +1,119 @@ +package nfs + +import ( + "fmt" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/packetbeat/publish" + "time" +) + +const ( + RPC_CALL = 0 + RPC_REPLY = 1 +) + +const NFS_PROGRAM_NUMBER = 100003 + +type RpcMessage struct { + ts time.Time + xdr Xdr +} + +var ACCEPT_STATUS = [...]string{ + "success", + "prog_unavail", + "prog_mismatch", + "proc_unavail", + "garbage_args", + "system_err", +} + +var calls_seen = common.NewCache(1*time.Minute, 8192) + +func (msg *RpcMessage) fillEvent(event common.MapStr, results publish.Transactions, size int) { + + xid := fmt.Sprintf("%.8x", msg.xdr.getUInt()) + + msgType := msg.xdr.getUInt() + + if msgType == RPC_CALL { + + // eat rpc version number + msg.xdr.getUInt() + + rpcProg := msg.xdr.getUInt() + rpcProgVers := msg.xdr.getUInt() + rpcProc := msg.xdr.getUInt() + + if rpcProg == NFS_PROGRAM_NUMBER { + + // build event only if it's a nfs packet + rpcInfo := common.MapStr{} + rpcInfo["xid"] = xid + rpcInfo["call_size"] = size + + auth_flavor := msg.xdr.getUInt() + auth_opaque := msg.xdr.getDynamicOpaque() + switch auth_flavor { + case 0: + rpcInfo["auth_flavor"] = "none" + case 1: + rpcInfo["auth_flavor"] = "unix" + cred := common.MapStr{} + credXdr := Xdr{data: auth_opaque, offset: 0} + cred["stamp"] = credXdr.getUInt() + cred["machinename"] = credXdr.getString() + cred["uid"] = credXdr.getUInt() + cred["gid"] = credXdr.getUInt() + cred["gids"] = credXdr.getUIntVector() + rpcInfo["cred"] = cred + case 6: + rpcInfo["auth_flavor"] = "rpcsec_gss" + default: + rpcInfo["auth_flavor"] = fmt.Sprintf("unknown (%d)", auth_flavor) + } + + // eat auth verifier + msg.xdr.getUInt() + msg.xdr.getDynamicOpaque() + + event["type"] = "nfs" + event["rpc"] = rpcInfo + nfs := Nfs{ts: msg.ts, event: event, xdr: msg.xdr, vers: rpcProgVers, proc: rpcProc} + nfs.getRequestInfo() + + // populate cache to trach request processing time + calls_seen.Put(xid, &nfs) + } + + } else { + replyStatus := msg.xdr.getUInt() + // we are interesed only in accepted rpc reply + if replyStatus != 0 { + return + } + + // eat auth verifier + msg.xdr.getUInt() + msg.xdr.getDynamicOpaque() + + // get cached request + v := calls_seen.Delete(xid) + if v != nil { + nfs := *(v.(*Nfs)) + rpcInfo := nfs.event["rpc"].(common.MapStr) + rpcInfo["reply_size"] = size + rpcTime := msg.ts.Sub(nfs.ts) + rpcInfo["time"] = rpcTime + // the same in human readable form + rpcInfo["time_str"] = fmt.Sprintf("%v", rpcTime) + acceptStatus := int(msg.xdr.getUInt()) + rpcInfo["status"] = ACCEPT_STATUS[acceptStatus] + // populate nfs info for seccessfully executed requests + if acceptStatus == 0 { + nfs.getReplyInfo(&msg.xdr) + } + results.PublishTransaction(nfs.event) + } + } +} diff --git a/packetbeat/protos/nfs/xdr.go b/packetbeat/protos/nfs/xdr.go new file mode 100644 index 00000000000..6a58b8f4091 --- /dev/null +++ b/packetbeat/protos/nfs/xdr.go @@ -0,0 +1,59 @@ +package nfs + +import ( + "encoding/binary" +) + +type Xdr struct { + data []byte + offset uint32 +} + +func (xdr *Xdr) getInt() int32 { + i := int32(binary.BigEndian.Uint32(xdr.data[xdr.offset : xdr.offset+4])) + xdr.offset += 4 + return int32(i) +} + +func (xdr *Xdr) getUInt() uint32 { + i := uint32(binary.BigEndian.Uint32(xdr.data[xdr.offset : xdr.offset+4])) + xdr.offset += 4 + return i +} + +func (xdr *Xdr) getHyper() int64 { + i := int64(binary.BigEndian.Uint64(xdr.data[xdr.offset : xdr.offset+8])) + xdr.offset += 8 + return i +} + +func (xdr *Xdr) getUHyper() uint64 { + i := uint64(binary.BigEndian.Uint64(xdr.data[xdr.offset : xdr.offset+8])) + xdr.offset += 8 + return i +} + +func (xdr *Xdr) getString() string { + return string(xdr.getDynamicOpaque()) +} + +func (xdr *Xdr) getOpaque(length uint32) []byte { + padding := (4 - (length & 3)) & 3 + b := xdr.data[xdr.offset : xdr.offset+length] + xdr.offset += length + padding + return b +} + +func (xdr *Xdr) getDynamicOpaque() []byte { + l := xdr.getUInt() + return xdr.getOpaque(l) +} + +func (xdr *Xdr) getUIntVector() []uint32 { + l := xdr.getUInt() + v := make([]uint32, int(l)) + for i := 0; i < len(v); i++ { + v[i] = xdr.getUInt() + } + return v +} diff --git a/packetbeat/protos/nfs/xdr_test.go b/packetbeat/protos/nfs/xdr_test.go new file mode 100644 index 00000000000..cc8bcc9ad55 --- /dev/null +++ b/packetbeat/protos/nfs/xdr_test.go @@ -0,0 +1,26 @@ +package nfs + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +var test_msg = []byte{ + 0x80, 0x00, 0x00, 0xe0, + 0xb5, 0x49, 0x21, 0xab, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, + 0x00, 0x00, 0x00, 0x04, + + 0x00, 0x00, 0x00, 0x0b, + 0x74, 0x65, 0x73, 0x74, 0x20, 0x73, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x00, +} + +func TestXdrDecoding(t *testing.T) { + xdr := Xdr{data: test_msg, offset: 0} + + assert.Equal(t, uint32(0x800000e0), uint32(xdr.getUInt())) + assert.Equal(t, uint32(0xb54921ab), uint32(xdr.getUInt())) + assert.Equal(t, uint64(2), uint64(xdr.getUHyper())) + assert.Equal(t, uint32(4), uint32(xdr.getUInt())) + assert.Equal(t, "test string", xdr.getString()) +} diff --git a/packetbeat/tests/system/config/packetbeat.yml.j2 b/packetbeat/tests/system/config/packetbeat.yml.j2 index 04eebd19deb..143551aa46c 100644 --- a/packetbeat/tests/system/config/packetbeat.yml.j2 +++ b/packetbeat/tests/system/config/packetbeat.yml.j2 @@ -126,6 +126,9 @@ protocols: {% if redis_send_request %} send_request: true{% endif %} {% if redis_send_response %} send_response: true{% endif %} + nfs: + ports: [{{ nfs_ports|default([2049])|join(", ") }}] + thrift: ports: [{{ thrift_ports|default([9090])|join(", ") }}] transport_type: "{{ thrift_transport_type|default('socket') }}" diff --git a/packetbeat/tests/system/pcaps/nfs_v3.pcap b/packetbeat/tests/system/pcaps/nfs_v3.pcap new file mode 100644 index 0000000000000000000000000000000000000000..b40d64daf5a79285c8ede7c920df377021c68e77 GIT binary patch literal 1308 zcmb`HO>7cT5XWa(sDP44q=d94z9Y3t5eTWVCLk&)MPpErlLYx_Nhy#8Dp-sgQG+p! z9y}Qj_>riIF?!LY2NSRM(&$lRdeYQG4<1Ze#xD&k=nqWNO2&U`Ms4tm{(T)DKz>#C3YlYSrQVH}Rv=E~_*nK>`yg=}Re zCCrX6kIU`yFz4C$X%@@Qv-8E-V(D^`<%H54tI3NIECW=N>LnBpxT6lQLJ7cxcTAYy z31)zDNvJZ9+jkgX*FVmLuSok1pkJW{$`KCOqW)U&6~<&*8Q}y#$660SU||2ZYj_|s zK5XQ;>4g{9Lq~Rq4)4b1eN1vjv+eeoNn88NZxLXo^Pr{{&;fVr3GT|H>Cp&kFPvPq zQ!Z7^qw9(8X*JA!?5f|-h$r-+6?~5|nN||d>)2c3 zS-Y$97?FrrxasDg=-?+dpZ0hn#3TaL6`t+qQ#Yr)4j|Y?q^z2hJ*Fi|eh-FLC82sg z{^Q)4=_|LI5$MSPfJwAtgtLTU?gGM60tK5y?F^tKylK6bq2Jwwfs(f<4nuEyL z5_e<2oLl%J^4?VAtFjr@SYkJWOSmS}N~-bFdPSOvmA*A-COTVdhyc|`{bz#a;(yMB zshX`+lev6BZUiwATN{l;Z0C}9f;o)Iw365ov3JsZd{^0&`Do#MEK2hc?XgYad_+V5 zrm*#!4`l$TKAl6`$05&p=r4U-kop)_&!O~j5&QUTTOTWUjwyL>1J)>@kq(R*pxsD( z=J10Hm1@OmR;aMDP@2zYvKe-%!7}m>N{8PSJ!|$o{KPA<9pBSqOlKw;df%SBrW$_$ D8yDJK literal 0 HcmV?d00001 diff --git a/packetbeat/tests/system/pcaps/nfs_v4.pcap b/packetbeat/tests/system/pcaps/nfs_v4.pcap new file mode 100644 index 0000000000000000000000000000000000000000..85a4bd3f4c0a2dfefd211e8b9fb76953f3a6e356 GIT binary patch literal 1420 zcmb_cF=$g!6umEvO&S}L!3JrUAPSXWNPlBd=@6?a6co3jb|@i<@F8vDH-&^M{zY+; zPNLuvbdWAp0wP5_DX4U)=-|}BNf2C;LI03=&-?G`S7LDRelG9b`)=-i=e$Eg;czHT zL}#ZHqk`A2GCzeWTQIBnTqSNKQ%Qr*V5mjid9@Px$;V%*OfY2} z`Z&=rjnNr%d8HOlC;2-vnVLvm${Ul(+Oes{DRgxE4f1*r&Dn08%k0~|{o3+z4EI`) zvP5m?RE9ExC%3Nih2XE+L6N9mOwZL{%-Z|Kt`bOW=ojYBNV`o%?)Qmm>!}+5B$Y1svh7y{s2ri510t@B*{O4a_iw3wVI5fE*j% z{?1`zWrp`~uSr~K=QME5T+_JJ+XLIC9p$$l*SH=ZU|f#d8#AUAF3fnd@u>mPOW}Kl zV1^iWvQl3Ud(L2eb@>-Acg+Z9*hC*VrJwG9=__CrbA^Iw;eNRL-*g3VnqSoZi>^jd z_`Ah9loV;ratzMUUTXr}H7~RpyPSf1O}@`g?5fn=y5>&lZZlC|*k;|`>%9EQU34VY zjMzkXxeK_H!@V8V-pfNjy=fkL>CKB5bM!3q3lwYk%ewOCXjO-{UVTnHYc=}lb5`ne ZRpWQ{xv)J~-