Skip to content

Commit

Permalink
[Filebeat] Improve ECS categorization field mappings for nats module (e…
Browse files Browse the repository at this point in the history
…lastic#17550)

* Improve ECS categorization field mappings for nats module

- event.kind
- event.type
- related.ip

Closes elastic#16173

(cherry picked from commit eb5ba9a)
  • Loading branch information
leehinman committed Apr 16, 2020
1 parent 2ebd67d commit c391726
Show file tree
Hide file tree
Showing 5 changed files with 297 additions and 178 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Improve ECS categorization field mappings for mysql module. {issue}16172[16172] {pull}17491[17491]
- Release Google Cloud module as GA. {pull}17511[17511]
- Update filebeat httpjson input to support pagination via Header and Okta module. {pull}16354[16354]
- Improve ECS categorization field mappings for nats module. {issue}16173[16173] {pull}17550[17550]

*Heartbeat*

Expand Down
177 changes: 0 additions & 177 deletions filebeat/module/nats/log/ingest/pipeline.json

This file was deleted.

181 changes: 181 additions & 0 deletions filebeat/module/nats/log/ingest/pipeline.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
description: Pipeline for parsing nats log logs
processors:
- grok:
field: message
patterns:
- \[%{POSINT:process.pid}\]( %{NATSTIME:nats.log.timestamp})? \[%{NATSLOGLEVEL:log.level}\]
%{GREEDYDATA:nats.log.info}
pattern_definitions:
NATSTIME: '%{YEAR}/%{MONTHNUM}/%{MONTHDAY} %{TIME}'
NATSLOGLEVEL: (INF|DBG|WRN|ERR|FTL|TRC)
ignore_missing: true
- grok:
field: nats.log.info
patterns:
- '%{IPV4:client.ip}:%{POSINT:client.port} - cid:%{POSINT:nats.log.client.id}
- %{GREEDYDATA:nats.log.msg.info}'
- '%{GREEDYDATA:nats.log.msg.data}'
ignore_missing: true
- grok:
field: nats.log.msg.info
patterns:
- '%{NATSDIRECTION:network.direction} %{NATSPAYLOAD:nats.log.msg.type}: \[%{GREEDYDATA:nats.log.msg.payload}\]'
- '%{NATSDIRECTION:network.direction} \[%{NATSNOINFO:nats.log.msg.type}\]'
- '%{NATSDIRECTION:network.direction} \[%{NATSUNSUB:nats.log.msg.type}\s+%{POSINT:nats.log.msg.sid}(\s+%{POSINT:nats.log.msg.max_messages})?\]'
- '%{NATSDIRECTION:network.direction} \[%{NATSPUB:nats.log.msg.type}\s+%{NOTSPACE:nats.log.msg.subject}(\s+%{NOTSPACE:nats.log.msg.reply_to})?\s+%{POSINT:nats.log.msg.bytes}\]'
- '%{NATSDIRECTION:network.direction} \[%{NATSSUB:nats.log.msg.type}\s+%{NOTSPACE:nats.log.msg.subject}(\s+%{NOTSPACE:nats.log.msg.queue_group})?\s+%{POSINT:nats.log.msg.sid}\]'
- '%{NATSDIRECTION:network.direction} \[%{NATSMSG:nats.log.msg.type}\s+%{NOTSPACE:nats.log.msg.subject}\s+%{POSINT:nats.log.msg.sid}(\s+%{NOTSPACE:nats.log.msg.reply_to})?\s+%{POSINT:nats.log.msg.bytes}\]'
- '%{NATSDIRECTION:network.direction} \[%{NATSCONNECTION:nats.log.msg.type}\s+%{GREEDYDATA:nats.log.msg.data}\]'
- '%{NATSDIRECTION:network.direction} \[%{NATSERROR:nats.log.msg.type}\s+%{GREEDYDATA:nats.log.msg.error\]'
- '%{GREEDYDATA:nats.log.msg.data}'
pattern_definitions:
NATSDIRECTION: (<<-|->>)
NATSMSG: MSG
NATSPUB: PUB
NATSSUB: SUB
NATSUNSUB: UNSUB
NATSPAYLOAD: MSG_PAYLOAD
NATSERROR: -ERROR
NATSPING: PING
NATSPONG: PONG
NATSOK: OK
NATSCONNECT: CONNECT
NATSINFO: INFO
NATSCONNECTION: (?:%{NATSCONNECT}|%{NATSINFO})
NATSNOINFO: (?:%{NATSPING}|%{NATSPONG}|%{NATSOK})
ignore_missing: true
- remove:
field: nats.log.info
- remove:
field: nats.log.msg.info
ignore_missing: true
- remove:
field: nats.log.msg.payload
ignore_missing: true
- remove:
field: message
- rename:
field: nats.log.msg.data
target_field: message
ignore_missing: true
- script:
lang: painless
source: |-
if (ctx.log.level == params.inf) {
ctx.log.level = params.info;
} else if (ctx.log.level == params.dbg) {
ctx.log.level = params.debug;
} else if (ctx.log.level == params.wrn) {
ctx.log.level = params.warning;
} else if (ctx.log.level == params.err) {
ctx.log.level = params.error;
} else if (ctx.log.level == params.ftl) {
ctx.log.level = params.fatal;
} else if (ctx.log.level == params.trc) {
ctx.log.level = params.trace;
}
params:
inf: INF
info: info
dbg: DBG
debug: debug
wrn: WRN
warning: warning
err: ERR
error: error
ftl: FTL
fatal: fatal
trc: TRC
trace: trace
- script:
lang: painless
source: |-
if (ctx.nats.log.msg.type == params.msg) {
ctx.nats.log.msg.type = params.message;
} else if (ctx.nats.log.msg.type == params.pub) {
ctx.nats.log.msg.type = params.publish;
} else if (ctx.nats.log.msg.type == params.sub) {
ctx.nats.log.msg.type = params.subscribe;
} else if (ctx.nats.log.msg.type == params.unsub) {
ctx.nats.log.msg.type = params.unsubscribe;
} else if (ctx.nats.log.msg.type == params.msg_payload) {
ctx.nats.log.msg.type = params.payload;
} else if (ctx.nats.log.msg.type == params.err) {
ctx.nats.log.msg.type = params.error;
} else if (ctx.nats.log.msg.type == params.pi) {
ctx.nats.log.msg.type = params.ping;
} else if (ctx.nats.log.msg.type == params.po) {
ctx.nats.log.msg.type = params.pong;
} else if (ctx.nats.log.msg.type == params.ok) {
ctx.nats.log.msg.type = params.acknowledge;
} else if (ctx.nats.log.msg.type == params.connect) {
ctx.nats.log.msg.type = params.connection;
} else if (ctx.nats.log.msg.type == params.info) {
ctx.nats.log.msg.type = params.information;
}
params:
msg: MSG
message: message
pub: PUB
publish: publish
sub: SUB
subscribe: subscribe
unsub: UNSUB
unsubscribe: unsubscribe
msg_payload: MSG_PAYLOAD
payload: payload
err: -ERROR
error: error
pi: PING
ping: ping
po: PONG
pong: pong
ok: OK
acknowledge: acknowledge
connect: CONNECT
connection: connection
info: INFO
information: information
if: ctx.nats.log.msg?.type != null
- script:
lang: painless
source: |-
if (ctx.network.direction == params.in) {
ctx.network.direction = params.inbound;
} else if (ctx.network.direction == params.out) {
ctx.network.direction = params.outbound;
}
params:
in: <<-
inbound: inbound
out: ->>
outbound: outbound
if: ctx.network?.direction != null
- rename:
field: '@timestamp'
target_field: event.created
- date:
field: nats.log.timestamp
target_field: '@timestamp'
formats:
- yyyy/MM/dd HH:mm:ss.SSSSSS
- remove:
field: nats.log.timestamp
- set:
field: event.kind
value: event
- append:
field: event.type
value: info
- append:
field: event.type
value: error
if: "ctx?.log?.level != null && (ctx.log.level == 'error' || ctx.log.level == 'fatal')"
- append:
field: related.ip
value: "{{client.ip}}"
if: "ctx?.client?.ip != null"
on_failure:
- set:
field: error.message
value: '{{ _ingest.on_failure_message }}'
2 changes: 1 addition & 1 deletion filebeat/module/nats/log/manifest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ var:
# See more on https://nats.io/documentation/server/gnatsd-logging/
- /var/log/nats/nats.log*

ingest_pipeline: ingest/pipeline.json
ingest_pipeline: ingest/pipeline.yml
input: config/log.yml
Loading

0 comments on commit c391726

Please sign in to comment.