Skip to content

Commit

Permalink
[Filebeat] Replace copy_from with templated value (#26631)
Browse files Browse the repository at this point in the history
* Replace copy_from with templated value

To ensure compatibility with Elasticsearch versions <7.13 this removes usage of `copy_from` in `set` processors.

Relates #26629

* panw: replace copy_from usage with script

Replaces the usage of a set processor with copy_from (ES 7.13+)
with a painless script that performs the same operation and it's
backwards compatible.

* cyberarkpas: Replace usage of copy_from with script

This updates the ID-mapping script to set fields instead of constructing
and op-list that is latter processed with foreach/set.

* Update CHANGELOG.next.asciidoc

Co-authored-by: Adrian Serrano <adrisr83@gmail.com>

Co-authored-by: Adrian Serrano <adrisr83@gmail.com>
(cherry picked from commit a7b0110)

# Conflicts:
#	x-pack/filebeat/module/panw/panos/ingest/pipeline.yml
#	x-pack/filebeat/module/threatintel/abuseurl/ingest/pipeline.yml
#	x-pack/filebeat/module/threatintel/anomali/ingest/pipeline.yml
#	x-pack/filebeat/module/threatintel/anomalithreatstream/ingest/pipeline.yml
#	x-pack/filebeat/module/threatintel/misp/ingest/pipeline.yml
#	x-pack/filebeat/module/threatintel/otx/ingest/pipeline.yml
#	x-pack/filebeat/module/threatintel/recordedfuture/ingest/pipeline.yml
#	x-pack/filebeat/module/zoom/webhook/ingest/meeting.yml
  • Loading branch information
andrewkroh authored and mergify-bot committed Jul 6, 2021
1 parent 9cedc14 commit a448ff8
Show file tree
Hide file tree
Showing 12 changed files with 873 additions and 22 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix bug in `httpjson` that prevented `first_event` getting updated. {pull}26407[26407]
- Fix bug in the Syslog input that misparsed rfc5424 days starting with 0. {pull}26419[26419]
- Do not close filestream harvester if an unexpected error is returned when close.on_state_change.* is enabled. {pull}26411[26411]
- Do not close filestream harvester if an unexpected error is returned when close.on_state_change.* is enabled. {pull}26411[26411]
- Fix Elasticsearch compatibility for modules that use `copy_from` in `set` processors. {issue}26629[26629]

*Filebeat*

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ processors:
path: elasticsearch.audit
- set:
field: http.request.id
copy_from: elasticsearch.audit.request.id
value: '{{{elasticsearch.audit.request.id}}}'
ignore_empty_value: true
- dot_expander:
field: cluster.name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ processors:
value: ""
- set:
field: http.request.id
copy_from: nginx.ingress_controller.http.request.id
value: '{{{nginx.ingress_controller.http.request.id}}}'
ignore_empty_value: true
ignore_failure: true
- script:
Expand Down
59 changes: 39 additions & 20 deletions x-pack/filebeat/module/cyberarkpas/audit/ingest/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -914,40 +914,59 @@ processors:
value: "success"
- set: event.reason
from: cyberarkpas.audit.reason
on_failure:
- append:
field: error.message
value: 'Failed to enrich based on ID #{{{ event.code }}}: {{{_ingest.on_failure_message}}}'
source: >
def clone(def val) {
return val instanceof List? new ArrayList(val) : val;
def clone(def ref) {
if (ref == null) return ref;
if (ref instanceof Map) {
ref = ref.entrySet().stream().collect(
Collectors.toMap(
e -> e.getKey(),
e -> clone(e.getValue())
)
);
} else if (ref instanceof List) {
ref = ref.stream().map(e -> clone(e)).collect(
Collectors.toList()
);
}
return ref;
}
def read_field(def map, String name) {
if (map == null || !(map instanceof Map)) return null;
int pos = name.indexOf(".");
return pos == -1? map[name]
: read_field(map[name.substring(0, pos)], name.substring(pos+1));
}
boolean set_field(Map map, String name, def value) {
int pos = name.indexOf(".");
if (pos == -1) {
map[name] = clone(value);
return true;
}
String key = name.substring(0, pos),
path = name.substring(pos+1);
if (!map.containsKey(key)) {
map[key] = new HashMap();
}
map = map[key];
return map instanceof Map? set_field(map, path, value)
: false;
}
String msgID = ctx.event?.code;
def actions = params.get(msgID);
if (actions == null) return;
List values = new ArrayList();
for (def item : actions) {
def val = item.value;
if (val == null && (val = read_field(ctx, item.from)) == null || val == "") continue;
values.add([
"to": item.set,
"value": clone(val)
]);
if (!set_field(ctx, item.set, val)) {
throw new Exception("Failed to set field " + item.set);
}
}
if (!values.isEmpty()) ctx._tmp["values"] = values;
- foreach:
field: _tmp.values
ignore_missing: true
processor:
set:
field: '{{{_ingest._value.to}}}'
copy_from: '_ingest._value.value'
ignore_empty_value: true
override: true
#
# Force event.outcome: unknown in case it gets a value other than one of the allowed.
Expand Down Expand Up @@ -994,7 +1013,7 @@ processors:
on_failure:
- set:
field: source.domain
copy_from: source.address
value: '{{{source.address}}}'
- convert:
field: destination.address
target_field: destination.ip
Expand All @@ -1003,7 +1022,7 @@ processors:
on_failure:
- set:
field: destination.domain
copy_from: destination.address
value: '{{{destination.address}}}'
#
# Populate related.ip
Expand Down
149 changes: 149 additions & 0 deletions x-pack/filebeat/module/panw/panos/ingest/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@ processors:
value:
- '{{panw.panos.network.nat.community_id}}'

<<<<<<< HEAD
- grok:
if: 'ctx?.panw?.panos?.threat?.name != null'
field: panw.panos.threat.name
Expand Down Expand Up @@ -611,6 +612,154 @@ processors:
- server.nat.ip
- server.nat.port
if: 'ctx?.destination?.nat?.ip == "0.0.0.0" && ctx?.destination?.nat?.port == 0'
=======
- set:
field: rule.name
value: "{{panw.panos.ruleset}}"
ignore_empty_value: true

# Set url and file values
- rename:
if: 'ctx?.panw?.panos?.sub_type != "url"'
field: url.original
target_field: file.name
ignore_missing: true

- grok:
field: url.original
patterns:
- '(%{ANY:url.scheme}\:\/\/)?(%{USERNAME:url.username}(\:%{PASSWORD:url.password})?\@)?%{DOMAIN:url.domain}(\:%{POSINT:url.port})?(%{PATH:url.path})?(\?%{QUERY:url.query})?(\#%{ANY:url.fragment})?'
ignore_missing: true
pattern_definitions:
USERNAME: '[^\:]*'
PASSWORD: '[^@]*'
DOMAIN: '[^\/\?#\:]*'
PATH: '[^\?#]*'
QUERY: '[^#]*'
ANY: '.*'
if: 'ctx?.url?.original != null && ctx?.url?.original != "-/" && ctx?.url?.original != ""'

- grok:
field: url.path
patterns:
- '%{FILENAME}((?:\.%{ANY})*(\.%{ANY:url.extension}))?'
ignore_missing: true
pattern_definitions:
FILENAME: '[^\.]+'
ANY: '.*'
if: 'ctx?.url?.path != null && ctx?.url?.path != ""'

- grok:
field: file.name
patterns:
- '%{FILENAME}((?:\.%{ANY})*(\.%{ANY:file.extension}))?'
ignore_missing: true
pattern_definitions:
FILENAME: '[^\.]+'
ANY: '.*'
if: 'ctx?.file?.name != null && ctx?.file?.name != ""'

- script:
lang: painless
description: Copy source.user to user
source: >
def clone(def ref) {
if (ref == null) return ref;
if (ref instanceof Map) {
ref = ref.entrySet().stream().collect(
Collectors.toMap(
e -> e.getKey(),
e -> clone(e.getValue())
)
);
} else if (ref instanceof List) {
ref = ref.stream().map(e -> clone(e)).collect(
Collectors.toList()
);
}
return ref;
}
def u = ctx?.source?.user;
if (u != null) {
ctx["user"] = clone(u);
}
- append:
field: related.user
allow_duplicates: false
value: "{{client.user.name}}"
if: "ctx?.client?.user?.name != null"

- append:
field: related.user
allow_duplicates: false
value: "{{source.user.name}}"
if: "ctx?.source?.user?.name != null"

- append:
field: related.user
allow_duplicates: false
value: "{{server.user.name}}"
if: "ctx?.server?.user?.name != null"

- append:
field: related.user
allow_duplicates: false
value: "{{destination.user.name}}"
if: "ctx?.destination?.user?.name != null"

- append:
field: related.user
allow_duplicates: false
value: "{{url.username}}"
if: "ctx?.url?.username != null && ctx?.url?.username != ''"

- append:
field: related.hash
allow_duplicates: false
value: "{{panw.panos.file.hash}}"
if: "ctx?.panw?.panos?.file?.hash != null"

- append:
field: related.hosts
allow_duplicates: false
value: "{{observer.hostname}}"
if: "ctx?.observer?.hostname != null && ctx.observer?.hostname != ''"

- append:
field: related.hosts
allow_duplicates: false
value: "{{host.name}}"
if: "ctx?.host?.name != null && ctx.host?.name != ''"

- append:
field: related.hosts
allow_duplicates: false
value: "{{url.domain}}"
if: "ctx?.url?.domain != null && ctx.url?.domain != ''"

# Remove temporary fields.
- remove:
field:
- _temp_
ignore_missing: true

# Remove NAT fields when translation was not done.
- remove:
field:
- source.nat.ip
- source.nat.port
- client.nat.ip
- client.nat.port
if: 'ctx?.source?.nat?.ip == "0.0.0.0" && ctx?.source?.nat?.port == 0'
- remove:
field:
- destination.nat.ip
- destination.nat.port
- server.nat.ip
- server.nat.port
if: 'ctx?.destination?.nat?.ip == "0.0.0.0" && ctx?.destination?.nat?.port == 0'
>>>>>>> a7b01105f ([Filebeat] Replace copy_from with templated value (#26631))

on_failure:
- set:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,18 @@ processors:
target_field: threatintel.indicator.url
keep_original: true
remove_if_successful: true
<<<<<<< HEAD
- rename:
field: threatintel.abuseurl.url
target_field: threatintel.indicator.url.full
ignore_missing: true
if: ctx?.threatintel?.indicator?.url?.original == null && ctx?.threatintel?.abuseurl?.url != null
=======
- set:
field: threatintel.indicator.url.full
value: '{{{threatintel.indicator.url.original}}}'
ignore_empty_value: true
>>>>>>> a7b01105f ([Filebeat] Replace copy_from with templated value (#26631))
- rename:
field: threatintel.abuseurl.host
target_field: threatintel.indicator.domain
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,18 @@ processors:
keep_original: true
remove_if_successful: true
if: ctx?.threatintel?.indicator?.type == 'url'
<<<<<<< HEAD
- rename:
field: _tmp.threatvalue
target_field: threatintel.indicator.url.full
ignore_missing: true
if: ctx?.threatintel?.indicator?.type == 'url' && ctx?.threatintel?.indicator?.url?.original == null
=======
- set:
field: threatintel.indicator.url.full
value: '{{{threatintel.indicator.url.original}}}'
ignore_empty_value: true
>>>>>>> a7b01105f ([Filebeat] Replace copy_from with templated value (#26631))
- rename:
field: _tmp.threatvalue
target_field: threatintel.indicator.email.address
Expand Down
Loading

0 comments on commit a448ff8

Please sign in to comment.