Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-44462: Try to trap case where another eups process deletes the db file #152

Merged
merged 5 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions python/eups/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,8 @@ def printUses(outstrm, productName, versionName=None, eupsenv=None,
else:
usesInfo = eupsenv.uses()
if pickleFile:
fd = utils.AtomicFile(pickleFile, "wb")
pickle.dump(usesInfo, fd, protocol=2)
fd.close()
with utils.AtomicFile(pickleFile, "wb") as fd:
pickle.dump(usesInfo, fd, protocol=2)

userList = eupsenv.uses(productName, versionName, depth, usesInfo=usesInfo)

Expand Down Expand Up @@ -967,4 +966,3 @@ def enableLocking(enableLocking=True):
"""Enable or disable the use of lock files"""
from . import lock
lock.disableLocking = not enableLocking

31 changes: 18 additions & 13 deletions python/eups/stack/ProductStack.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
# considered a global tag.
userPrefix = "user:"

dotre = re.compile(r'\.')
persistVersionNameNoDot = persistVersionName.replace(".", "_")
who = utils.getUserName()

class ProductStack:
Expand Down Expand Up @@ -47,13 +47,13 @@ class ProductStack:
persistVersion = persistVersionName

# static variable: name of file extension to use to persist data
persistFileExt = "pickleDB%s" % dotre.sub('_', persistVersionName)
persistFileExt = f"pickleDB{persistVersionNameNoDot}"

# static variable: regexp for cache file names
persistFileRe = re.compile(r'^(\w\S*)\.%s$' % persistFileExt)
persistFileRe = re.compile(rf'^(\w\S*)\.{persistFileExt}$')

# static variable: name of file extension to use to persist data
userTagFileExt = "pickleTag%s" % dotre.sub('_', persistVersionName)
userTagFileExt = f"pickleTag{persistVersionNameNoDot}"

def __init__(self, dbpath, persistDir=None, autosave=True):
"""
Expand Down Expand Up @@ -252,8 +252,15 @@ def save(self, flavors=None, dir=None):
raise CacheOutOfSync(outofsync)

def _cacheFileIsInSync(self, file):
return (file not in self.modtimes or
os.stat(file).st_mtime <= self.modtimes[file])
if file not in self.modtimes:
return True
try:
older = os.stat(file).st_mtime <= self.modtimes[file]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seem to be a number of similar os.stat() calls. They don't need protection as well?

It's curious that the only thing that seems to remove this file is an admin command.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will take a look at the other places.

I was also wondering how the file disappears even in the case where 30 EUPS processes in parallel are trying to read the same database file. The atomic rename should not result in the file disappearing so I realize that this is papering over the cracks a little. @RobertLuptonTheGood do you know of a way that the pickle file could be deleted in normal usage?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a couple more traps but reload is a problem.

except FileNotFoundError:
# File must have been deleted by other eups process.
del self.modtimes[file]
return True
return older

def cacheIsInSync(self, flavors=None):
"""
Expand Down Expand Up @@ -300,9 +307,9 @@ def persist(self, flavor, file=None):
self.lookup[flavor] = {}
flavorData = self.lookup[flavor]

fd = utils.AtomicFile(file, "wb")
pickle.dump(flavorData, fd, protocol=2)
fd.close()
with utils.AtomicFile(file, "wb") as fd:
pickle.dump(flavorData, fd, protocol=2)
# This could fail if another process deleted the file immediately.
self.modtimes[file] = os.stat(file).st_mtime

def export(self):
Expand Down Expand Up @@ -690,9 +697,8 @@ def reload(self, flavors=None, persistDir=None, verbose=0):
for flavor in flavors:
fileName = self._persistPath(flavor,persistDir)
self.modtimes[fileName] = os.stat(fileName).st_mtime
fd = open(fileName, "rb")
lookup = pickle.load(fd)
fd.close()
with open(fileName, "rb") as fd:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code is littered with the old style opens. I have only fixed the one that was in the same file as I was editing.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what I'm supposed to do about the race in this code. Maybe it's a bit early in the day for me but that code gets the file name from the flavor, calls stat without checking if the cache file is there, then tries to open it. I think the code is assuming that the reload call is gated by a check higher up.

lookup = pickle.load(fd)

self.lookup[flavor] = lookup

Expand Down Expand Up @@ -886,4 +892,3 @@ def __init__(self, files=None, flavors=None, maxsave=None, msg=None):
self.files = files
self.flavors = flavors
self.maxsave = maxsave

80 changes: 41 additions & 39 deletions python/eups/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Utility functions used across EUPS classes.
"""
import contextlib
import time
import os
import sys
Expand Down Expand Up @@ -883,54 +884,55 @@ def cmp_prods_and_none(a, b):
raise RuntimeError("A cyclic dependency exists amongst %s" %
" ".join(sorted([name([x for x in p]) for p in graph.keys()])))

class AtomicFile:
"""
A file to which all the changes (writes) are committed all at once,
or not at all. Useful for avoiding race conditions where a reader
may be trying to read from a file that's still being written to.

This is accomplished by creating a temporary file into which all the
writes are directed, and then renaming it to the destination
filename on close(). On POSIX-compliant filesystems, the rename is
guaranteed to be atomic.
@contextlib.contextmanager
def AtomicFile(fn: str, mode: str):
"""Guarantee atomic writes to a file.

Parameters
----------
fn : `str`
Name of file to write.
mode : `str`
Write mode.

Returns
-------
fd : `typing.IO`
File handle to use for writing.

Notes
-----
A file to which all the changes (writes) are committed all at once,
or not at all. Useful for avoiding race conditions where a reader
may be trying to read from a file that's still being written to.

This is accomplished by creating a temporary file into which all the
writes are directed, and then renaming it to the destination
filename on close(). On POSIX-compliant filesystems, the rename is
guaranteed to be atomic.

Presents a file-like object interface.
Should be used as a context manager.

Constructor arguments:
fn: filename (string)
mode: the read/write mode (string), must be equal to
"w" or "wb", for now
.. code-block:: python

Return value:
file object
with AtomicFile("myfile.txt", "w") as fd:
print("Some text", file=fd)
"""
def __init__(self, fn, mode):
assert(mode in ["w", "wb"]) # no other modes are currently implemented
dir = os.path.dirname(fn)

self._fn = fn
dir = os.path.dirname(fn)
fh, tmpfn = tempfile.mkstemp(suffix='.tmp', dir=dir)
fp = os.fdopen(fh, mode)

(self._fh, self._tmpfn) = tempfile.mkstemp(suffix='.tmp', dir=dir)
self._fp = os.fdopen(self._fh, mode)
yield fp

def __getattr__(self, name):
try:
return object.__getattr__(self, name)
except AttributeError:
return getattr(self._fp, name)
# Needed because fclose() doesn't guarantee fsync()
# in POSIX, which may lead to interesting issues (e.g., see
# http://thunk.org/tytso/blog/2009/03/12/delayed-allocation-and-the-zero-length-file-problem/ )
os.fsync(fh)
fp.close()
os.rename(tmpfn, fn)

def __setattr__(self, name, value):
if name.startswith('_'):
return object.__setattr__(self, name, value)
else:
return setattr(self._fp, name, value)

def close(self):
os.fsync(self._fh) # Needed because fclose() doesn't guarantee fsync()
# in POSIX, which may lead to interesting issues (e.g., see
# http://thunk.org/tytso/blog/2009/03/12/delayed-allocation-and-the-zero-length-file-problem/ )
self._fp.close()
os.rename(self._tmpfn, self._fn)

def isSubpath(path, root):
"""!Return True if path is root or in root
Expand Down
Loading