Skip to content

Commit

Permalink
Implement SQLMixin
Browse files Browse the repository at this point in the history
This commit replaces the SQLBot with an SQLMixin, which lets us use
multiple inheritance with the bots.
  • Loading branch information
Birger Schacht committed Aug 19, 2021
1 parent ff7d835 commit 0751555
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 105 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ CHANGELOG
### Configuration

### Core
- `intelmq.lib.bot.SQLBot` was replaced by an SQLMixin in `intelmq.lib.mixins.SQLMixin`. The Generic DB Lookup Expert bot and the SQLOutput bot were updated accordingly.

### Development

Expand Down
5 changes: 3 additions & 2 deletions intelmq/bots/experts/generic_db_lookup/expert.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
Generic DB Lookup
"""

from intelmq.lib.bot import SQLBot
from intelmq.lib.bot import Bot
from intelmq.lib.mixins import SQLMixin


class GenericDBLookupExpertBot(SQLBot):
class GenericDBLookupExpertBot(Bot, SQLMixin):
"""Fetche data from a database"""
database: str = "intelmq"
engine: str = "<postgresql OR sqlite>"
Expand Down
5 changes: 3 additions & 2 deletions intelmq/bots/outputs/sql/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
(missing fields etc).
"""

from intelmq.lib.bot import SQLBot
from intelmq.lib.bot import OutputBot
from intelmq.lib.mixins import SQLMixin


class SQLOutputBot(SQLBot):
class SQLOutputBot(OutputBot, SQLMixin):
"""Send events to a PostgreSQL or SQLite database"""
autocommit = True
database = "intelmq-events"
Expand Down
101 changes: 1 addition & 100 deletions intelmq/lib/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
* Bot: generic base class for all kind of bots
* CollectorBot: base class for collectors
* ParserBot: base class for parsers
* SQLBot: base class for any bots using SQL
"""
import argparse
import atexit
Expand Down Expand Up @@ -41,7 +40,7 @@
from intelmq.lib.pipeline import PipelineFactory, Pipeline
from intelmq.lib.utils import RewindableFileHandle, base64_decode

__all__ = ['Bot', 'CollectorBot', 'ParserBot', 'SQLBot', 'OutputBot']
__all__ = ['Bot', 'CollectorBot', 'ParserBot', 'OutputBot']


class Bot(object):
Expand Down Expand Up @@ -1213,104 +1212,6 @@ def new_report(self):
return libmessage.Report()


class SQLBot(Bot):
"""
Inherit this bot so that it handles DB connection for you.
You do not have to bother:
* connecting database in the self.init() method, just call super().init(), self.cur will be set
* catching exceptions, just call self.execute() instead of self.cur.execute()
* self.format_char will be set to '%s' in PostgreSQL and to '?' in SQLite
"""

POSTGRESQL = "postgresql"
SQLITE = "sqlite"
default_engine = "postgresql"

def init(self):
self.engine_name = getattr(self, 'engine', self.default_engine).lower()
engines = {SQLBot.POSTGRESQL: (self._init_postgresql, "%s"),
SQLBot.SQLITE: (self._init_sqlite, "?")}
for key, val in engines.items():
if self.engine_name == key:
val[0]()
self.format_char = val[1]
break
else:
raise ValueError("Wrong parameter 'engine' {0!r}, possible values are {1}".format(self.engine_name, engines))

def _connect(self, engine, connect_args: dict, autocommitable: bool = False):
self.engine = engine # imported external library that connects to the DB
self.logger.debug("Connecting to database.")

try:
self.con = self.engine.connect(**connect_args)
if autocommitable: # psycopg2 has it, sqlite3 has not
self.con.autocommit = getattr(self, 'autocommit', True) # True prevents deadlocks
self.cur = self.con.cursor()
except (self.engine.Error, Exception):
self.logger.exception('Failed to connect to database.')
self.stop()
self.logger.info("Connected to database.")

def _init_postgresql(self):
try:
import psycopg2
import psycopg2.extras
except ImportError:
raise exceptions.MissingDependencyError("psycopg2")

self._connect(psycopg2,
{"database": self.database,
"user": self.user,
"password": self.password,
"host": self.host,
"port": self.port,
"sslmode": self.sslmode,
"connect_timeout": getattr(self, 'connect_timeout', 5)
},
autocommitable=True)

def _init_sqlite(self):
try:
import sqlite3
except ImportError:
raise exceptions.MissingDependencyError("sqlite3")

self._connect(sqlite3,
{"database": self.database,
"timeout": getattr(self, 'connect_timeout', 5)
}
)

def execute(self, query: str, values: tuple, rollback=False):
try:
self.logger.debug('Executing %r.', (query, values))
# note: this assumes, the DB was created with UTF-8 support!
self.cur.execute(query, values)
self.logger.debug('Done.')
except (self.engine.InterfaceError, self.engine.InternalError,
self.engine.OperationalError, AttributeError):
if rollback:
try:
self.con.rollback()
self.logger.exception('Executed rollback command '
'after failed query execution.')
except self.engine.OperationalError:
self.logger.exception('Executed rollback command '
'after failed query execution.')
self.init()
except Exception:
self.logger.exception('Cursor has been closed, connecting '
'again.')
self.init()
else:
self.logger.exception('Database connection problem, connecting again.')
self.init()
else:
return True
return False


class OutputBot(Bot):
"""
Base class for outputs.
Expand Down
3 changes: 2 additions & 1 deletion intelmq/lib/mixins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@

from intelmq.lib.mixins.http import HttpMixin
from intelmq.lib.mixins.cache import CacheMixin
from intelmq.lib.mixins.sql import SQLMixin

__all__ = ['HttpMixin', 'CacheMixin']
__all__ = ['HttpMixin', 'CacheMixin', 'SQLMixin']
111 changes: 111 additions & 0 deletions intelmq/lib/mixins/sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
""" SQLMixin for IntelMQ
SPDX-FileCopyrightText: 2021 Birger Schacht
SPDX-License-Identifier: AGPL-3.0-or-later
Based on the former SQLBot base class
"""


class SQLMixin:
"""
Inherit this bot so that it handles DB connection for you.
You do not have to bother:
* connecting database in the self.init() method, just call super().init(), self.cur will be set
* catching exceptions, just call self.execute() instead of self.cur.execute()
* self.format_char will be set to '%s' in PostgreSQL and to '?' in SQLite
"""

POSTGRESQL = "postgresql"
SQLITE = "sqlite"
default_engine = "postgresql"
engine = None
# overwrite the default value from the OutputBot
message_jsondict_as_string = True

def __init__(self, *args, **kwargs):
self.logger.debug("Running SQL Mixin initialization.")
self.engine_name = getattr(self, 'engine', self.default_engine).lower()
engines = {SQLMixin.POSTGRESQL: (self._init_postgresql, "%s"),
SQLMixin.SQLITE: (self._init_sqlite, "?")}
for key, val in engines.items():
if self.engine_name == key:
val[0]()
self.format_char = val[1]
break
else:
raise ValueError("Wrong parameter 'engine' {0!r}, possible values are {1}".format(self.engine_name, engines))

super().__init__()

def _connect(self, engine, connect_args: dict, autocommitable: bool = False):
self.engine = engine # imported external library that connects to the DB
self.logger.debug(f"Connecting to database with connect_args: {connect_args}.")

try:
self.con = self.engine.connect(**connect_args)
if autocommitable: # psycopg2 has it, sqlite3 has not
self.con.autocommit = getattr(self, 'autocommit', True) # True prevents deadlocks
self.cur = self.con.cursor()
except (self.engine.Error, Exception):
self.logger.exception('Failed to connect to database.')
self.stop()
self.logger.info("Connected to database.")

def _init_postgresql(self):
try:
import psycopg2
import psycopg2.extras
except ImportError:
raise exceptions.MissingDependencyError("psycopg2")

self._connect(psycopg2,
{"database": self.database,
"user": self.user,
"password": self.password,
"host": self.host,
"port": self.port,
"sslmode": self.sslmode,
"connect_timeout": getattr(self, 'connect_timeout', 5)
},
autocommitable=True)

def _init_sqlite(self):
try:
import sqlite3
except ImportError:
raise exceptions.MissingDependencyError("sqlite3")

self._connect(sqlite3,
{"database": self.database,
"timeout": getattr(self, 'connect_timeout', 5)
}
)

def execute(self, query: str, values: tuple, rollback=False):
try:
self.logger.debug('Executing %r.', (query, values))
# note: this assumes, the DB was created with UTF-8 support!
self.cur.execute(query, values)
self.logger.debug('Done.')
except (self.engine.InterfaceError, self.engine.InternalError,
self.engine.OperationalError, AttributeError):
if rollback:
try:
self.con.rollback()
self.logger.exception('Executed rollback command '
'after failed query execution.')
except self.engine.OperationalError:
self.logger.exception('Executed rollback command '
'after failed query execution.')
self.init()
except Exception:
self.logger.exception('Cursor has been closed, connecting '
'again.')
self.init()
else:
self.logger.exception('Database connection problem, connecting again.')
self.init()
else:
return True
return False

0 comments on commit 0751555

Please sign in to comment.