Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new bot: cut string from string #1965

Merged
12 commits merged into from
Sep 24, 2021
23 changes: 23 additions & 0 deletions docs/user/bots.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1838,6 +1838,29 @@ Public documentation: https://www.team-cymru.com/IP-ASN-mapping.html#dns
* `overwrite`: Overwrite existing fields. Default: `True` if not given (for backwards compatibility, will change in version 3.0.0)


.. _intelmq.bots.experts.remove_affix.expert:

RemoveAffix
^^^^^^^^^^^

**Information**

* `name:` `intelmq.bots.experts.remove_affix.expert`
* `lookup:` none
* `public:` yes
* `cache (redis db):` none
* `description:` Cut string from string

**Configuration Parameters**

* `remove_prefix`: True - cut from start, False - cut from end
* `affix`: example 'www.'
* `field`: example field 'source.fqdn'

**Description**
Remove part of string from string, example: `www.` from domains.


.. _intelmq.bots.experts.domain_suffix.expert:

Domain Suffix
Expand Down
Empty file.
41 changes: 41 additions & 0 deletions intelmq/bots/experts/remove_affix/expert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# -*- coding: utf-8 -*-
"""
Remove Affix

SPDX-FileCopyrightText: 2021 Marius Karotkis <marius.karotkis@gmail.com>
SPDX-License-Identifier: AGPL-3.0-or-later
"""
from intelmq.lib.bot import Bot


class RemoveAffixExpertBot(Bot):
remove_prefix: bool = True # True - from start, False - from end
affix: str = 'www.'
field: str = 'source.fqdn'

def process(self):
event = self.receive_message()

if self.field in event:
if self.remove_prefix:
event.change(self.field, self.removeprefix(event[self.field], self.affix))
else:
event.change(self.field, self.removesuffix(event[self.field], self.affix))

self.send_message(event)
self.acknowledge_message()

def removeprefix(self, field: str, prefix: str) -> str:
if field.startswith(prefix):
return field[len(prefix):]
else:
return field[:]

def removesuffix(self, field: str, suffix: str) -> str:
if suffix and field.endswith(suffix):
return field[:-len(suffix)]
else:
return field[:]


BOT = RemoveAffixExpertBot
Empty file.
93 changes: 93 additions & 0 deletions intelmq/tests/bots/experts/remove_affix/test_expert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# -*- coding: utf-8 -*-
"""
Remove affix - String cut from string

SPDX-FileCopyrightText: 2021 Marius Karotkis <marius.karotkis@gmail.com>
SPDX-License-Identifier: AGPL-3.0-or-later
"""

import unittest
import intelmq.lib.test as test
from intelmq.bots.experts.remove_affix.expert import RemoveAffixExpertBot

EXAMPLE_INPUT = {
'__type': 'Event',
'feed.accuracy': 100.0,
'feed.name': 'MISP events',
'feed.provider': 'MISP BAE',
'time.observation': '2020-10-20T12:57:33+00:00',
'feed.url': 'https://sig01.threatreveal.com',
'source.fqdn': 'www.google.lt',
'extra.elastic_index': 'cti-2020-10',
'extra.elastic_id': 'VwVnSnUBXjJtaqsUSw8T'}

EXAMPLE_OUTPUT = {
'__type': 'Event',
'feed.accuracy': 100.0,
'feed.name': 'MISP events',
'feed.provider': 'MISP BAE',
'time.observation': '2020-10-20T12:57:33+00:00',
'feed.url': 'https://sig01.threatreveal.com',
'source.fqdn': 'google.lt',
'extra.elastic_index': 'cti-2020-10',
'extra.elastic_id': 'VwVnSnUBXjJtaqsUSw8T'}

EXAMPLE_OUTPUT1 = {
'__type': 'Event',
'feed.accuracy': 100.0,
'feed.name': 'MISP events',
'feed.provider': 'MISP BAE',
'time.observation': '2020-10-20T12:57:33+00:00',
'feed.url': 'https://sig01.threatreveal.com',
'source.fqdn': 'www.google',
'extra.elastic_index': 'cti-2020-10',
'extra.elastic_id': 'VwVnSnUBXjJtaqsUSw8T'}

EXAMPLE_INPUT_2 = {
'__type': 'Event',
'feed.accuracy': 100.0,
'feed.name': 'MISP events',
'feed.provider': 'MISP BAE',
'time.observation': '2020-10-20T12:57:33+00:00',
'feed.url': 'https://sig01.threatreveal.com',
'extra.elastic_index': 'cti-2020-10',
'extra.elastic_id': 'VwVnSnUBXjJtaqsUSw8T'}

EXAMPLE_OUTPUT_2 = {
'__type': 'Event',
'feed.accuracy': 100.0,
'feed.name': 'MISP events',
'feed.provider': 'MISP BAE',
'time.observation': '2020-10-20T12:57:33+00:00',
'feed.url': 'https://sig01.threatreveal.com',
'extra.elastic_index': 'cti-2020-10',
'extra.elastic_id': 'VwVnSnUBXjJtaqsUSw8T'}


class TestRemoveAffixExpertBot(test.BotTestCase, unittest.TestCase):
"""
A TestCase for TestRemoveAffixExpertBot.
"""

@classmethod
def set_bot(cls):
cls.bot_reference = RemoveAffixExpertBot

def test_event_cut_start(self):
self.input_message = EXAMPLE_INPUT
self.run_bot()
self.assertMessageEqual(0, EXAMPLE_OUTPUT)

def test_event_cut_without_field(self):
self.input_message = EXAMPLE_INPUT_2
self.run_bot()
self.assertMessageEqual(0, EXAMPLE_OUTPUT_2)

def test_event_cut_end(self):
self.input_message = EXAMPLE_INPUT
self.run_bot(parameters={"remove_prefix": False, "affix": ".lt"})
self.assertMessageEqual(0, EXAMPLE_OUTPUT1)


if __name__ == '__main__': # pragma: no cover
unittest.main()