Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deadlock where opening database fails #107

Merged
merged 10 commits into from
Jan 30, 2021
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

- Do not create tables when in read-only mode (PR [#128](https://github.com/RaRe-Technologies/sqlitedict/pull/128), [@hholst80](https://github.com/hholst80))
- Use tempfile.mkstemp for safer temp file creation (PR [#106](https://github.com/RaRe-Technologies/sqlitedict/pull/106), [@ergoithz](https://github.com/ergoithz))
- Fix deadlock where opening database fails (PR [#107](https://github.com/RaRe-Technologies/sqlitedict/pull/107), [@padelt](https://github.com/padelt))

## 1.7.0, 04/09/2018

Expand Down
68 changes: 56 additions & 12 deletions sqlitedict.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import sys
import tempfile
import logging
import time
import traceback

from threading import Thread
Expand All @@ -56,6 +57,9 @@ def exec_(_code_, _globs_=None, _locs_=None):
_locs_ = _globs_
exec("""exec _code_ in _globs_, _locs_""")

class TimeoutError(OSError):
pass

exec_("def reraise(tp, value, tb=None):\n"
" raise tp, value, tb\n")
else:
Expand Down Expand Up @@ -105,7 +109,7 @@ class SqliteDict(DictClass):
VALID_FLAGS = ['c', 'r', 'w', 'n']

def __init__(self, filename=None, tablename='unnamed', flag='c',
autocommit=False, journal_mode="DELETE", encode=encode, decode=decode):
autocommit=False, journal_mode="DELETE", encode=encode, decode=decode, timeout=5):
"""
Initialize a thread-safe sqlite-backed dictionary. The dictionary will
be a table `tablename` in database file `filename`. A single file (=database)
Expand Down Expand Up @@ -136,6 +140,8 @@ def __init__(self, filename=None, tablename='unnamed', flag='c',
object.
The default is to use pickle.

The `timeout` defines the maximum time (in seconds) to wait for initial Thread startup.

"""
self.in_temp = filename is None
if self.in_temp:
Expand Down Expand Up @@ -165,6 +171,7 @@ def __init__(self, filename=None, tablename='unnamed', flag='c',
self.journal_mode = journal_mode
self.encode = encode
self.decode = decode
self.timeout = timeout

logger.info("opening Sqlite table %r in %r" % (tablename, filename))
self.conn = self._new_conn()
Expand All @@ -180,7 +187,8 @@ def __init__(self, filename=None, tablename='unnamed', flag='c',
self.clear()

def _new_conn(self):
return SqliteMultithread(self.filename, autocommit=self.autocommit, journal_mode=self.journal_mode)
return SqliteMultithread(self.filename, autocommit=self.autocommit, journal_mode=self.journal_mode,
timeout=self.timeout)

def __enter__(self):
if not hasattr(self, 'conn') or self.conn is None:
Expand Down Expand Up @@ -381,7 +389,7 @@ class SqliteMultithread(Thread):
in a separate thread (in the same order they arrived).

"""
def __init__(self, filename, autocommit, journal_mode):
def __init__(self, filename, autocommit, journal_mode, timeout):
super(SqliteMultithread, self).__init__()
self.filename = filename
self.autocommit = autocommit
Expand All @@ -390,19 +398,34 @@ def __init__(self, filename, autocommit, journal_mode):
self.reqs = Queue()
self.setDaemon(True) # python2.5-compatible
self.exception = None
self._sqlitedict_thread_initialized = None
self.timeout = timeout
self.log = logging.getLogger('sqlitedict.SqliteMultithread')
self.start()

def run(self):
if self.autocommit:
conn = sqlite3.connect(self.filename, isolation_level=None, check_same_thread=False)
else:
conn = sqlite3.connect(self.filename, check_same_thread=False)
conn.execute('PRAGMA journal_mode = %s' % self.journal_mode)
conn.text_factory = str
cursor = conn.cursor()
conn.commit()
cursor.execute('PRAGMA synchronous=OFF')
try:
if self.autocommit:
conn = sqlite3.connect(self.filename, isolation_level=None, check_same_thread=False)
else:
conn = sqlite3.connect(self.filename, check_same_thread=False)
except Exception:
self.log.exception("Failed to initialize connection for filename: %s" % self.filename)
self.exception = sys.exc_info()
raise

try:
conn.execute('PRAGMA journal_mode = %s' % self.journal_mode)
conn.text_factory = str
cursor = conn.cursor()
conn.commit()
cursor.execute('PRAGMA synchronous=OFF')
except Exception:
self.log.exception("Failed to execute PRAGMA statements.")
self.exception = sys.exc_info()
raise

self._sqlitedict_thread_initialized = True

res = None
while True:
Expand Down Expand Up @@ -488,6 +511,7 @@ def execute(self, req, arg=None, res=None):
"""
`execute` calls are non-blocking: just queue up the request and return immediately.
"""
self._wait_for_initialization()
self.check_raise_error()

# NOTE: This might be a lot of information to pump into an input
Expand Down Expand Up @@ -552,6 +576,26 @@ def close(self, force=False):
self.select_one('--close--')
self.join()

def _wait_for_initialization(self):
"""
Polls the 'initialized' flag to be set by the started Thread in run().
"""
# A race condition may occur without waiting for initialization:
# __init__() finishes with the start() call, but the Thread needs some time to actually start working.
# If opening the database file fails in run(), an exception will occur and self.exception will be set.
# But if we run check_raise_error() before run() had a chance to set self.exception, it will report
# a false negative: An exception occured and the thread terminates but self.exception is unset.
# This leads to a deadlock while waiting for the results of execute().
# By waiting for the Thread to set the initialized flag, we can ensure the thread has successfully
# opened the file - and possibly set self.exception to be detected by check_raise_error().

start_time = time.time()
while time.time() - start_time < self.timeout:
if self._sqlitedict_thread_initialized or self.exception:
return
time.sleep(0.1)
raise TimeoutError("SqliteMultithread failed to flag initialization withing %0.0f seconds." % self.timeout)


if __name__ == '__main__':
print(__version__)