diff --git a/CHANGELOG.md b/CHANGELOG.md index 11d8b0cd0..f3a6fe5ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ CHANGELOG ### Configuration ### Core +- `intelmq.lib.bot.SQLBot` was replaced by an SQLMixin in `intelmq.lib.mixins.SQLMixin`. The Generic DB Lookup Expert bot and the SQLOutput bot were updated accordingly. ### Development diff --git a/intelmq/bots/experts/generic_db_lookup/expert.py b/intelmq/bots/experts/generic_db_lookup/expert.py index 441ea6e1a..72a02de20 100644 --- a/intelmq/bots/experts/generic_db_lookup/expert.py +++ b/intelmq/bots/experts/generic_db_lookup/expert.py @@ -7,10 +7,11 @@ Generic DB Lookup """ -from intelmq.lib.bot import SQLBot +from intelmq.lib.bot import Bot +from intelmq.lib.mixins import SQLMixin -class GenericDBLookupExpertBot(SQLBot): +class GenericDBLookupExpertBot(Bot, SQLMixin): """Fetche data from a database""" database: str = "intelmq" engine: str = "" diff --git a/intelmq/bots/outputs/sql/output.py b/intelmq/bots/outputs/sql/output.py index ae15842cc..b976b1920 100644 --- a/intelmq/bots/outputs/sql/output.py +++ b/intelmq/bots/outputs/sql/output.py @@ -13,10 +13,11 @@ (missing fields etc). """ -from intelmq.lib.bot import SQLBot +from intelmq.lib.bot import OutputBot +from intelmq.lib.mixins import SQLMixin -class SQLOutputBot(SQLBot): +class SQLOutputBot(OutputBot, SQLMixin): """Send events to a PostgreSQL or SQLite database""" autocommit = True database = "intelmq-events" diff --git a/intelmq/lib/bot.py b/intelmq/lib/bot.py index 93d891d7b..84d4d5563 100644 --- a/intelmq/lib/bot.py +++ b/intelmq/lib/bot.py @@ -8,7 +8,6 @@ * Bot: generic base class for all kind of bots * CollectorBot: base class for collectors * ParserBot: base class for parsers - * SQLBot: base class for any bots using SQL """ import argparse import atexit @@ -41,7 +40,7 @@ from intelmq.lib.pipeline import PipelineFactory, Pipeline from intelmq.lib.utils import RewindableFileHandle, base64_decode -__all__ = ['Bot', 'CollectorBot', 'ParserBot', 'SQLBot', 'OutputBot'] +__all__ = ['Bot', 'CollectorBot', 'ParserBot', 'OutputBot'] class Bot(object): @@ -1213,104 +1212,6 @@ def new_report(self): return libmessage.Report() -class SQLBot(Bot): - """ - Inherit this bot so that it handles DB connection for you. - You do not have to bother: - * connecting database in the self.init() method, just call super().init(), self.cur will be set - * catching exceptions, just call self.execute() instead of self.cur.execute() - * self.format_char will be set to '%s' in PostgreSQL and to '?' in SQLite - """ - - POSTGRESQL = "postgresql" - SQLITE = "sqlite" - default_engine = "postgresql" - - def init(self): - self.engine_name = getattr(self, 'engine', self.default_engine).lower() - engines = {SQLBot.POSTGRESQL: (self._init_postgresql, "%s"), - SQLBot.SQLITE: (self._init_sqlite, "?")} - for key, val in engines.items(): - if self.engine_name == key: - val[0]() - self.format_char = val[1] - break - else: - raise ValueError("Wrong parameter 'engine' {0!r}, possible values are {1}".format(self.engine_name, engines)) - - def _connect(self, engine, connect_args: dict, autocommitable: bool = False): - self.engine = engine # imported external library that connects to the DB - self.logger.debug("Connecting to database.") - - try: - self.con = self.engine.connect(**connect_args) - if autocommitable: # psycopg2 has it, sqlite3 has not - self.con.autocommit = getattr(self, 'autocommit', True) # True prevents deadlocks - self.cur = self.con.cursor() - except (self.engine.Error, Exception): - self.logger.exception('Failed to connect to database.') - self.stop() - self.logger.info("Connected to database.") - - def _init_postgresql(self): - try: - import psycopg2 - import psycopg2.extras - except ImportError: - raise exceptions.MissingDependencyError("psycopg2") - - self._connect(psycopg2, - {"database": self.database, - "user": self.user, - "password": self.password, - "host": self.host, - "port": self.port, - "sslmode": self.sslmode, - "connect_timeout": getattr(self, 'connect_timeout', 5) - }, - autocommitable=True) - - def _init_sqlite(self): - try: - import sqlite3 - except ImportError: - raise exceptions.MissingDependencyError("sqlite3") - - self._connect(sqlite3, - {"database": self.database, - "timeout": getattr(self, 'connect_timeout', 5) - } - ) - - def execute(self, query: str, values: tuple, rollback=False): - try: - self.logger.debug('Executing %r.', (query, values)) - # note: this assumes, the DB was created with UTF-8 support! - self.cur.execute(query, values) - self.logger.debug('Done.') - except (self.engine.InterfaceError, self.engine.InternalError, - self.engine.OperationalError, AttributeError): - if rollback: - try: - self.con.rollback() - self.logger.exception('Executed rollback command ' - 'after failed query execution.') - except self.engine.OperationalError: - self.logger.exception('Executed rollback command ' - 'after failed query execution.') - self.init() - except Exception: - self.logger.exception('Cursor has been closed, connecting ' - 'again.') - self.init() - else: - self.logger.exception('Database connection problem, connecting again.') - self.init() - else: - return True - return False - - class OutputBot(Bot): """ Base class for outputs. diff --git a/intelmq/lib/mixins/__init__.py b/intelmq/lib/mixins/__init__.py index 5a28bf0c5..d33313548 100644 --- a/intelmq/lib/mixins/__init__.py +++ b/intelmq/lib/mixins/__init__.py @@ -4,5 +4,6 @@ from intelmq.lib.mixins.http import HttpMixin from intelmq.lib.mixins.cache import CacheMixin +from intelmq.lib.mixins.sql import SQLMixin -__all__ = ['HttpMixin', 'CacheMixin'] +__all__ = ['HttpMixin', 'CacheMixin', 'SQLMixin'] diff --git a/intelmq/lib/mixins/sql.py b/intelmq/lib/mixins/sql.py new file mode 100644 index 000000000..3ea21839b --- /dev/null +++ b/intelmq/lib/mixins/sql.py @@ -0,0 +1,111 @@ +""" SQLMixin for IntelMQ + +SPDX-FileCopyrightText: 2021 Birger Schacht +SPDX-License-Identifier: AGPL-3.0-or-later + +Based on the former SQLBot base class +""" + + +class SQLMixin: + """ + Inherit this bot so that it handles DB connection for you. + You do not have to bother: + * connecting database in the self.init() method, just call super().init(), self.cur will be set + * catching exceptions, just call self.execute() instead of self.cur.execute() + * self.format_char will be set to '%s' in PostgreSQL and to '?' in SQLite + """ + + POSTGRESQL = "postgresql" + SQLITE = "sqlite" + default_engine = "postgresql" + engine = None + # overwrite the default value from the OutputBot + message_jsondict_as_string = True + + def __init__(self, *args, **kwargs): + self.logger.debug("Running SQL Mixin initialization.") + self.engine_name = getattr(self, 'engine', self.default_engine).lower() + engines = {SQLMixin.POSTGRESQL: (self._init_postgresql, "%s"), + SQLMixin.SQLITE: (self._init_sqlite, "?")} + for key, val in engines.items(): + if self.engine_name == key: + val[0]() + self.format_char = val[1] + break + else: + raise ValueError("Wrong parameter 'engine' {0!r}, possible values are {1}".format(self.engine_name, engines)) + + super().__init__() + + def _connect(self, engine, connect_args: dict, autocommitable: bool = False): + self.engine = engine # imported external library that connects to the DB + self.logger.debug(f"Connecting to database with connect_args: {connect_args}.") + + try: + self.con = self.engine.connect(**connect_args) + if autocommitable: # psycopg2 has it, sqlite3 has not + self.con.autocommit = getattr(self, 'autocommit', True) # True prevents deadlocks + self.cur = self.con.cursor() + except (self.engine.Error, Exception): + self.logger.exception('Failed to connect to database.') + self.stop() + self.logger.info("Connected to database.") + + def _init_postgresql(self): + try: + import psycopg2 + import psycopg2.extras + except ImportError: + raise exceptions.MissingDependencyError("psycopg2") + + self._connect(psycopg2, + {"database": self.database, + "user": self.user, + "password": self.password, + "host": self.host, + "port": self.port, + "sslmode": self.sslmode, + "connect_timeout": getattr(self, 'connect_timeout', 5) + }, + autocommitable=True) + + def _init_sqlite(self): + try: + import sqlite3 + except ImportError: + raise exceptions.MissingDependencyError("sqlite3") + + self._connect(sqlite3, + {"database": self.database, + "timeout": getattr(self, 'connect_timeout', 5) + } + ) + + def execute(self, query: str, values: tuple, rollback=False): + try: + self.logger.debug('Executing %r.', (query, values)) + # note: this assumes, the DB was created with UTF-8 support! + self.cur.execute(query, values) + self.logger.debug('Done.') + except (self.engine.InterfaceError, self.engine.InternalError, + self.engine.OperationalError, AttributeError): + if rollback: + try: + self.con.rollback() + self.logger.exception('Executed rollback command ' + 'after failed query execution.') + except self.engine.OperationalError: + self.logger.exception('Executed rollback command ' + 'after failed query execution.') + self.init() + except Exception: + self.logger.exception('Cursor has been closed, connecting ' + 'again.') + self.init() + else: + self.logger.exception('Database connection problem, connecting again.') + self.init() + else: + return True + return False